cubefs

Форк
0
1268 строк · 38.0 Кб
1
/*
2
 *
3
 * Copyright 2014 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package transport
20

21
import (
22
	"bytes"
23
	"context"
24
	"errors"
25
	"fmt"
26
	"io"
27
	"math"
28
	"net"
29
	"strconv"
30
	"sync"
31
	"sync/atomic"
32
	"time"
33

34
	"github.com/golang/protobuf/proto"
35
	"golang.org/x/net/http2"
36
	"golang.org/x/net/http2/hpack"
37
	"google.golang.org/grpc/internal/grpcutil"
38

39
	"google.golang.org/grpc/codes"
40
	"google.golang.org/grpc/credentials"
41
	"google.golang.org/grpc/internal/channelz"
42
	"google.golang.org/grpc/internal/grpcrand"
43
	"google.golang.org/grpc/keepalive"
44
	"google.golang.org/grpc/metadata"
45
	"google.golang.org/grpc/peer"
46
	"google.golang.org/grpc/stats"
47
	"google.golang.org/grpc/status"
48
	"google.golang.org/grpc/tap"
49
)
50

51
var (
52
	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
53
	// the stream's state.
54
	ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
55
	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
56
	// than the limit set by peer.
57
	ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
58
)
59

60
// serverConnectionCounter counts the number of connections a server has seen
61
// (equal to the number of http2Servers created). Must be accessed atomically.
62
var serverConnectionCounter uint64
63

64
// http2Server implements the ServerTransport interface with HTTP2.
65
type http2Server struct {
66
	lastRead    int64 // Keep this field 64-bit aligned. Accessed atomically.
67
	ctx         context.Context
68
	done        chan struct{}
69
	conn        net.Conn
70
	loopy       *loopyWriter
71
	readerDone  chan struct{} // sync point to enable testing.
72
	writerDone  chan struct{} // sync point to enable testing.
73
	remoteAddr  net.Addr
74
	localAddr   net.Addr
75
	maxStreamID uint32               // max stream ID ever seen
76
	authInfo    credentials.AuthInfo // auth info about the connection
77
	inTapHandle tap.ServerInHandle
78
	framer      *framer
79
	// The max number of concurrent streams.
80
	maxStreams uint32
81
	// controlBuf delivers all the control related tasks (e.g., window
82
	// updates, reset streams, and various settings) to the controller.
83
	controlBuf *controlBuffer
84
	fc         *trInFlow
85
	stats      stats.Handler
86
	// Keepalive and max-age parameters for the server.
87
	kp keepalive.ServerParameters
88
	// Keepalive enforcement policy.
89
	kep keepalive.EnforcementPolicy
90
	// The time instance last ping was received.
91
	lastPingAt time.Time
92
	// Number of times the client has violated keepalive ping policy so far.
93
	pingStrikes uint8
94
	// Flag to signify that number of ping strikes should be reset to 0.
95
	// This is set whenever data or header frames are sent.
96
	// 1 means yes.
97
	resetPingStrikes      uint32 // Accessed atomically.
98
	initialWindowSize     int32
99
	bdpEst                *bdpEstimator
100
	maxSendHeaderListSize *uint32
101

102
	mu sync.Mutex // guard the following
103

104
	// drainChan is initialized when drain(...) is called the first time.
105
	// After which the server writes out the first GoAway(with ID 2^31-1) frame.
106
	// Then an independent goroutine will be launched to later send the second GoAway.
107
	// During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
108
	// Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
109
	// already underway.
110
	drainChan     chan struct{}
111
	state         transportState
112
	activeStreams map[uint32]*Stream
113
	// idle is the time instant when the connection went idle.
114
	// This is either the beginning of the connection or when the number of
115
	// RPCs go down to 0.
116
	// When the connection is busy, this value is set to 0.
117
	idle time.Time
118

119
	// Fields below are for channelz metric collection.
120
	channelzID int64 // channelz unique identification number
121
	czData     *channelzData
122
	bufferPool *bufferPool
123

124
	connectionID uint64
125
}
126

127
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
128
// returned if something goes wrong.
129
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
130
	writeBufSize := config.WriteBufferSize
131
	readBufSize := config.ReadBufferSize
132
	maxHeaderListSize := defaultServerMaxHeaderListSize
133
	if config.MaxHeaderListSize != nil {
134
		maxHeaderListSize = *config.MaxHeaderListSize
135
	}
136
	framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
137
	// Send initial settings as connection preface to client.
138
	isettings := []http2.Setting{{
139
		ID:  http2.SettingMaxFrameSize,
140
		Val: http2MaxFrameLen,
141
	}}
142
	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
143
	// permitted in the HTTP2 spec.
144
	maxStreams := config.MaxStreams
145
	if maxStreams == 0 {
146
		maxStreams = math.MaxUint32
147
	} else {
148
		isettings = append(isettings, http2.Setting{
149
			ID:  http2.SettingMaxConcurrentStreams,
150
			Val: maxStreams,
151
		})
152
	}
153
	dynamicWindow := true
154
	iwz := int32(initialWindowSize)
155
	if config.InitialWindowSize >= defaultWindowSize {
156
		iwz = config.InitialWindowSize
157
		dynamicWindow = false
158
	}
159
	icwz := int32(initialWindowSize)
160
	if config.InitialConnWindowSize >= defaultWindowSize {
161
		icwz = config.InitialConnWindowSize
162
		dynamicWindow = false
163
	}
164
	if iwz != defaultWindowSize {
165
		isettings = append(isettings, http2.Setting{
166
			ID:  http2.SettingInitialWindowSize,
167
			Val: uint32(iwz)})
168
	}
169
	if config.MaxHeaderListSize != nil {
170
		isettings = append(isettings, http2.Setting{
171
			ID:  http2.SettingMaxHeaderListSize,
172
			Val: *config.MaxHeaderListSize,
173
		})
174
	}
175
	if config.HeaderTableSize != nil {
176
		isettings = append(isettings, http2.Setting{
177
			ID:  http2.SettingHeaderTableSize,
178
			Val: *config.HeaderTableSize,
179
		})
180
	}
181
	if err := framer.fr.WriteSettings(isettings...); err != nil {
182
		return nil, connectionErrorf(false, err, "transport: %v", err)
183
	}
184
	// Adjust the connection flow control window if needed.
185
	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
186
		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
187
			return nil, connectionErrorf(false, err, "transport: %v", err)
188
		}
189
	}
190
	kp := config.KeepaliveParams
191
	if kp.MaxConnectionIdle == 0 {
192
		kp.MaxConnectionIdle = defaultMaxConnectionIdle
193
	}
194
	if kp.MaxConnectionAge == 0 {
195
		kp.MaxConnectionAge = defaultMaxConnectionAge
196
	}
197
	// Add a jitter to MaxConnectionAge.
198
	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
199
	if kp.MaxConnectionAgeGrace == 0 {
200
		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
201
	}
202
	if kp.Time == 0 {
203
		kp.Time = defaultServerKeepaliveTime
204
	}
205
	if kp.Timeout == 0 {
206
		kp.Timeout = defaultServerKeepaliveTimeout
207
	}
208
	kep := config.KeepalivePolicy
209
	if kep.MinTime == 0 {
210
		kep.MinTime = defaultKeepalivePolicyMinTime
211
	}
212
	done := make(chan struct{})
213
	t := &http2Server{
214
		ctx:               context.Background(),
215
		done:              done,
216
		conn:              conn,
217
		remoteAddr:        conn.RemoteAddr(),
218
		localAddr:         conn.LocalAddr(),
219
		authInfo:          config.AuthInfo,
220
		framer:            framer,
221
		readerDone:        make(chan struct{}),
222
		writerDone:        make(chan struct{}),
223
		maxStreams:        maxStreams,
224
		inTapHandle:       config.InTapHandle,
225
		fc:                &trInFlow{limit: uint32(icwz)},
226
		state:             reachable,
227
		activeStreams:     make(map[uint32]*Stream),
228
		stats:             config.StatsHandler,
229
		kp:                kp,
230
		idle:              time.Now(),
231
		kep:               kep,
232
		initialWindowSize: iwz,
233
		czData:            new(channelzData),
234
		bufferPool:        newBufferPool(),
235
	}
236
	t.controlBuf = newControlBuffer(t.done)
237
	if dynamicWindow {
238
		t.bdpEst = &bdpEstimator{
239
			bdp:               initialWindowSize,
240
			updateFlowControl: t.updateFlowControl,
241
		}
242
	}
243
	if t.stats != nil {
244
		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
245
			RemoteAddr: t.remoteAddr,
246
			LocalAddr:  t.localAddr,
247
		})
248
		connBegin := &stats.ConnBegin{}
249
		t.stats.HandleConn(t.ctx, connBegin)
250
	}
251
	if channelz.IsOn() {
252
		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
253
	}
254

255
	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
256

257
	t.framer.writer.Flush()
258

259
	defer func() {
260
		if err != nil {
261
			t.Close()
262
		}
263
	}()
264

265
	// Check the validity of client preface.
266
	preface := make([]byte, len(clientPreface))
267
	if _, err := io.ReadFull(t.conn, preface); err != nil {
268
		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
269
	}
270
	if !bytes.Equal(preface, clientPreface) {
271
		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
272
	}
273

274
	frame, err := t.framer.fr.ReadFrame()
275
	if err == io.EOF || err == io.ErrUnexpectedEOF {
276
		return nil, err
277
	}
278
	if err != nil {
279
		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
280
	}
281
	atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
282
	sf, ok := frame.(*http2.SettingsFrame)
283
	if !ok {
284
		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
285
	}
286
	t.handleSettings(sf)
287

288
	go func() {
289
		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
290
		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
291
		if err := t.loopy.run(); err != nil {
292
			if logger.V(logLevel) {
293
				logger.Errorf("transport: loopyWriter.run returning. Err: %v", err)
294
			}
295
		}
296
		t.conn.Close()
297
		close(t.writerDone)
298
	}()
299
	go t.keepalive()
300
	return t, nil
301
}
302

303
// operateHeader takes action on the decoded headers.
304
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
305
	streamID := frame.Header().StreamID
306
	state := &decodeState{
307
		serverSide: true,
308
	}
309
	if h2code, err := state.decodeHeader(frame); err != nil {
310
		if _, ok := status.FromError(err); ok {
311
			t.controlBuf.put(&cleanupStream{
312
				streamID: streamID,
313
				rst:      true,
314
				rstCode:  h2code,
315
				onWrite:  func() {},
316
			})
317
		}
318
		return false
319
	}
320

321
	buf := newRecvBuffer()
322
	s := &Stream{
323
		id:             streamID,
324
		st:             t,
325
		buf:            buf,
326
		fc:             &inFlow{limit: uint32(t.initialWindowSize)},
327
		recvCompress:   state.data.encoding,
328
		method:         state.data.method,
329
		contentSubtype: state.data.contentSubtype,
330
	}
331
	if frame.StreamEnded() {
332
		// s is just created by the caller. No lock needed.
333
		s.state = streamReadDone
334
	}
335
	if state.data.timeoutSet {
336
		s.ctx, s.cancel = context.WithTimeout(t.ctx, state.data.timeout)
337
	} else {
338
		s.ctx, s.cancel = context.WithCancel(t.ctx)
339
	}
340
	pr := &peer.Peer{
341
		Addr: t.remoteAddr,
342
	}
343
	// Attach Auth info if there is any.
344
	if t.authInfo != nil {
345
		pr.AuthInfo = t.authInfo
346
	}
347
	s.ctx = peer.NewContext(s.ctx, pr)
348
	// Attach the received metadata to the context.
349
	if len(state.data.mdata) > 0 {
350
		s.ctx = metadata.NewIncomingContext(s.ctx, state.data.mdata)
351
	}
352
	if state.data.statsTags != nil {
353
		s.ctx = stats.SetIncomingTags(s.ctx, state.data.statsTags)
354
	}
355
	if state.data.statsTrace != nil {
356
		s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
357
	}
358
	if t.inTapHandle != nil {
359
		var err error
360
		info := &tap.Info{
361
			FullMethodName: state.data.method,
362
		}
363
		s.ctx, err = t.inTapHandle(s.ctx, info)
364
		if err != nil {
365
			if logger.V(logLevel) {
366
				logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
367
			}
368
			t.controlBuf.put(&cleanupStream{
369
				streamID: s.id,
370
				rst:      true,
371
				rstCode:  http2.ErrCodeRefusedStream,
372
				onWrite:  func() {},
373
			})
374
			s.cancel()
375
			return false
376
		}
377
	}
378
	t.mu.Lock()
379
	if t.state != reachable {
380
		t.mu.Unlock()
381
		s.cancel()
382
		return false
383
	}
384
	if uint32(len(t.activeStreams)) >= t.maxStreams {
385
		t.mu.Unlock()
386
		t.controlBuf.put(&cleanupStream{
387
			streamID: streamID,
388
			rst:      true,
389
			rstCode:  http2.ErrCodeRefusedStream,
390
			onWrite:  func() {},
391
		})
392
		s.cancel()
393
		return false
394
	}
395
	if streamID%2 != 1 || streamID <= t.maxStreamID {
396
		t.mu.Unlock()
397
		// illegal gRPC stream id.
398
		if logger.V(logLevel) {
399
			logger.Errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
400
		}
401
		s.cancel()
402
		return true
403
	}
404
	t.maxStreamID = streamID
405
	t.activeStreams[streamID] = s
406
	if len(t.activeStreams) == 1 {
407
		t.idle = time.Time{}
408
	}
409
	t.mu.Unlock()
410
	if channelz.IsOn() {
411
		atomic.AddInt64(&t.czData.streamsStarted, 1)
412
		atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
413
	}
414
	s.requestRead = func(n int) {
415
		t.adjustWindow(s, uint32(n))
416
	}
417
	s.ctx = traceCtx(s.ctx, s.method)
418
	if t.stats != nil {
419
		s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
420
		inHeader := &stats.InHeader{
421
			FullMethod:  s.method,
422
			RemoteAddr:  t.remoteAddr,
423
			LocalAddr:   t.localAddr,
424
			Compression: s.recvCompress,
425
			WireLength:  int(frame.Header().Length),
426
			Header:      metadata.MD(state.data.mdata).Copy(),
427
		}
428
		t.stats.HandleRPC(s.ctx, inHeader)
429
	}
430
	s.ctxDone = s.ctx.Done()
431
	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
432
	s.trReader = &transportReader{
433
		reader: &recvBufferReader{
434
			ctx:        s.ctx,
435
			ctxDone:    s.ctxDone,
436
			recv:       s.buf,
437
			freeBuffer: t.bufferPool.put,
438
		},
439
		windowHandler: func(n int) {
440
			t.updateWindow(s, uint32(n))
441
		},
442
	}
443
	// Register the stream with loopy.
444
	t.controlBuf.put(&registerStream{
445
		streamID: s.id,
446
		wq:       s.wq,
447
	})
448
	handle(s)
449
	return false
450
}
451

452
// HandleStreams receives incoming streams using the given handler. This is
453
// typically run in a separate goroutine.
454
// traceCtx attaches trace to ctx and returns the new context.
455
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
456
	defer close(t.readerDone)
457
	for {
458
		t.controlBuf.throttle()
459
		frame, err := t.framer.fr.ReadFrame()
460
		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
461
		if err != nil {
462
			if se, ok := err.(http2.StreamError); ok {
463
				if logger.V(logLevel) {
464
					logger.Warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
465
				}
466
				t.mu.Lock()
467
				s := t.activeStreams[se.StreamID]
468
				t.mu.Unlock()
469
				if s != nil {
470
					t.closeStream(s, true, se.Code, false)
471
				} else {
472
					t.controlBuf.put(&cleanupStream{
473
						streamID: se.StreamID,
474
						rst:      true,
475
						rstCode:  se.Code,
476
						onWrite:  func() {},
477
					})
478
				}
479
				continue
480
			}
481
			if err == io.EOF || err == io.ErrUnexpectedEOF {
482
				t.Close()
483
				return
484
			}
485
			if logger.V(logLevel) {
486
				logger.Warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
487
			}
488
			t.Close()
489
			return
490
		}
491
		switch frame := frame.(type) {
492
		case *http2.MetaHeadersFrame:
493
			if t.operateHeaders(frame, handle, traceCtx) {
494
				t.Close()
495
				break
496
			}
497
		case *http2.DataFrame:
498
			t.handleData(frame)
499
		case *http2.RSTStreamFrame:
500
			t.handleRSTStream(frame)
501
		case *http2.SettingsFrame:
502
			t.handleSettings(frame)
503
		case *http2.PingFrame:
504
			t.handlePing(frame)
505
		case *http2.WindowUpdateFrame:
506
			t.handleWindowUpdate(frame)
507
		case *http2.GoAwayFrame:
508
			// TODO: Handle GoAway from the client appropriately.
509
		default:
510
			if logger.V(logLevel) {
511
				logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
512
			}
513
		}
514
	}
515
}
516

517
func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
518
	t.mu.Lock()
519
	defer t.mu.Unlock()
520
	if t.activeStreams == nil {
521
		// The transport is closing.
522
		return nil, false
523
	}
524
	s, ok := t.activeStreams[f.Header().StreamID]
525
	if !ok {
526
		// The stream is already done.
527
		return nil, false
528
	}
529
	return s, true
530
}
531

532
// adjustWindow sends out extra window update over the initial window size
533
// of stream if the application is requesting data larger in size than
534
// the window.
535
func (t *http2Server) adjustWindow(s *Stream, n uint32) {
536
	if w := s.fc.maybeAdjust(n); w > 0 {
537
		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
538
	}
539

540
}
541

542
// updateWindow adjusts the inbound quota for the stream and the transport.
543
// Window updates will deliver to the controller for sending when
544
// the cumulative quota exceeds the corresponding threshold.
545
func (t *http2Server) updateWindow(s *Stream, n uint32) {
546
	if w := s.fc.onRead(n); w > 0 {
547
		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
548
			increment: w,
549
		})
550
	}
551
}
552

553
// updateFlowControl updates the incoming flow control windows
554
// for the transport and the stream based on the current bdp
555
// estimation.
556
func (t *http2Server) updateFlowControl(n uint32) {
557
	t.mu.Lock()
558
	for _, s := range t.activeStreams {
559
		s.fc.newLimit(n)
560
	}
561
	t.initialWindowSize = int32(n)
562
	t.mu.Unlock()
563
	t.controlBuf.put(&outgoingWindowUpdate{
564
		streamID:  0,
565
		increment: t.fc.newLimit(n),
566
	})
567
	t.controlBuf.put(&outgoingSettings{
568
		ss: []http2.Setting{
569
			{
570
				ID:  http2.SettingInitialWindowSize,
571
				Val: n,
572
			},
573
		},
574
	})
575

576
}
577

578
func (t *http2Server) handleData(f *http2.DataFrame) {
579
	size := f.Header().Length
580
	var sendBDPPing bool
581
	if t.bdpEst != nil {
582
		sendBDPPing = t.bdpEst.add(size)
583
	}
584
	// Decouple connection's flow control from application's read.
585
	// An update on connection's flow control should not depend on
586
	// whether user application has read the data or not. Such a
587
	// restriction is already imposed on the stream's flow control,
588
	// and therefore the sender will be blocked anyways.
589
	// Decoupling the connection flow control will prevent other
590
	// active(fast) streams from starving in presence of slow or
591
	// inactive streams.
592
	if w := t.fc.onData(size); w > 0 {
593
		t.controlBuf.put(&outgoingWindowUpdate{
594
			streamID:  0,
595
			increment: w,
596
		})
597
	}
598
	if sendBDPPing {
599
		// Avoid excessive ping detection (e.g. in an L7 proxy)
600
		// by sending a window update prior to the BDP ping.
601
		if w := t.fc.reset(); w > 0 {
602
			t.controlBuf.put(&outgoingWindowUpdate{
603
				streamID:  0,
604
				increment: w,
605
			})
606
		}
607
		t.controlBuf.put(bdpPing)
608
	}
609
	// Select the right stream to dispatch.
610
	s, ok := t.getStream(f)
611
	if !ok {
612
		return
613
	}
614
	if s.getState() == streamReadDone {
615
		t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
616
		return
617
	}
618
	if size > 0 {
619
		if err := s.fc.onData(size); err != nil {
620
			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
621
			return
622
		}
623
		if f.Header().Flags.Has(http2.FlagDataPadded) {
624
			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
625
				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
626
			}
627
		}
628
		// TODO(bradfitz, zhaoq): A copy is required here because there is no
629
		// guarantee f.Data() is consumed before the arrival of next frame.
630
		// Can this copy be eliminated?
631
		if len(f.Data()) > 0 {
632
			buffer := t.bufferPool.get()
633
			buffer.Reset()
634
			buffer.Write(f.Data())
635
			s.write(recvMsg{buffer: buffer})
636
		}
637
	}
638
	if f.Header().Flags.Has(http2.FlagDataEndStream) {
639
		// Received the end of stream from the client.
640
		s.compareAndSwapState(streamActive, streamReadDone)
641
		s.write(recvMsg{err: io.EOF})
642
	}
643
}
644

645
func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
646
	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
647
	if s, ok := t.getStream(f); ok {
648
		t.closeStream(s, false, 0, false)
649
		return
650
	}
651
	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
652
	t.controlBuf.put(&cleanupStream{
653
		streamID: f.Header().StreamID,
654
		rst:      false,
655
		rstCode:  0,
656
		onWrite:  func() {},
657
	})
658
}
659

660
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
661
	if f.IsAck() {
662
		return
663
	}
664
	var ss []http2.Setting
665
	var updateFuncs []func()
666
	f.ForeachSetting(func(s http2.Setting) error {
667
		switch s.ID {
668
		case http2.SettingMaxHeaderListSize:
669
			updateFuncs = append(updateFuncs, func() {
670
				t.maxSendHeaderListSize = new(uint32)
671
				*t.maxSendHeaderListSize = s.Val
672
			})
673
		default:
674
			ss = append(ss, s)
675
		}
676
		return nil
677
	})
678
	t.controlBuf.executeAndPut(func(interface{}) bool {
679
		for _, f := range updateFuncs {
680
			f()
681
		}
682
		return true
683
	}, &incomingSettings{
684
		ss: ss,
685
	})
686
}
687

688
const (
689
	maxPingStrikes     = 2
690
	defaultPingTimeout = 2 * time.Hour
691
)
692

693
func (t *http2Server) handlePing(f *http2.PingFrame) {
694
	if f.IsAck() {
695
		if f.Data == goAwayPing.data && t.drainChan != nil {
696
			close(t.drainChan)
697
			return
698
		}
699
		// Maybe it's a BDP ping.
700
		if t.bdpEst != nil {
701
			t.bdpEst.calculate(f.Data)
702
		}
703
		return
704
	}
705
	pingAck := &ping{ack: true}
706
	copy(pingAck.data[:], f.Data[:])
707
	t.controlBuf.put(pingAck)
708

709
	now := time.Now()
710
	defer func() {
711
		t.lastPingAt = now
712
	}()
713
	// A reset ping strikes means that we don't need to check for policy
714
	// violation for this ping and the pingStrikes counter should be set
715
	// to 0.
716
	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
717
		t.pingStrikes = 0
718
		return
719
	}
720
	t.mu.Lock()
721
	ns := len(t.activeStreams)
722
	t.mu.Unlock()
723
	if ns < 1 && !t.kep.PermitWithoutStream {
724
		// Keepalive shouldn't be active thus, this new ping should
725
		// have come after at least defaultPingTimeout.
726
		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
727
			t.pingStrikes++
728
		}
729
	} else {
730
		// Check if keepalive policy is respected.
731
		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
732
			t.pingStrikes++
733
		}
734
	}
735

736
	if t.pingStrikes > maxPingStrikes {
737
		// Send goaway and close the connection.
738
		if logger.V(logLevel) {
739
			logger.Errorf("transport: Got too many pings from the client, closing the connection.")
740
		}
741
		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
742
	}
743
}
744

745
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
746
	t.controlBuf.put(&incomingWindowUpdate{
747
		streamID:  f.Header().StreamID,
748
		increment: f.Increment,
749
	})
750
}
751

752
func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
753
	for k, vv := range md {
754
		if isReservedHeader(k) {
755
			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
756
			continue
757
		}
758
		for _, v := range vv {
759
			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
760
		}
761
	}
762
	return headerFields
763
}
764

765
func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
766
	if t.maxSendHeaderListSize == nil {
767
		return true
768
	}
769
	hdrFrame := it.(*headerFrame)
770
	var sz int64
771
	for _, f := range hdrFrame.hf {
772
		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
773
			if logger.V(logLevel) {
774
				logger.Errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
775
			}
776
			return false
777
		}
778
	}
779
	return true
780
}
781

782
// WriteHeader sends the header metadata md back to the client.
783
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
784
	if s.updateHeaderSent() || s.getState() == streamDone {
785
		return ErrIllegalHeaderWrite
786
	}
787
	s.hdrMu.Lock()
788
	if md.Len() > 0 {
789
		if s.header.Len() > 0 {
790
			s.header = metadata.Join(s.header, md)
791
		} else {
792
			s.header = md
793
		}
794
	}
795
	if err := t.writeHeaderLocked(s); err != nil {
796
		s.hdrMu.Unlock()
797
		return err
798
	}
799
	s.hdrMu.Unlock()
800
	return nil
801
}
802

803
func (t *http2Server) setResetPingStrikes() {
804
	atomic.StoreUint32(&t.resetPingStrikes, 1)
805
}
806

807
func (t *http2Server) writeHeaderLocked(s *Stream) error {
808
	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
809
	// first and create a slice of that exact size.
810
	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
811
	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
812
	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
813
	if s.sendCompress != "" {
814
		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
815
	}
816
	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
817
	success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
818
		streamID:  s.id,
819
		hf:        headerFields,
820
		endStream: false,
821
		onWrite:   t.setResetPingStrikes,
822
	})
823
	if !success {
824
		if err != nil {
825
			return err
826
		}
827
		t.closeStream(s, true, http2.ErrCodeInternal, false)
828
		return ErrHeaderListSizeLimitViolation
829
	}
830
	if t.stats != nil {
831
		// Note: Headers are compressed with hpack after this call returns.
832
		// No WireLength field is set here.
833
		outHeader := &stats.OutHeader{
834
			Header:      s.header.Copy(),
835
			Compression: s.sendCompress,
836
		}
837
		t.stats.HandleRPC(s.Context(), outHeader)
838
	}
839
	return nil
840
}
841

842
// WriteStatus sends stream status to the client and terminates the stream.
843
// There is no further I/O operations being able to perform on this stream.
844
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
845
// OK is adopted.
846
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
847
	if s.getState() == streamDone {
848
		return nil
849
	}
850
	s.hdrMu.Lock()
851
	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
852
	// first and create a slice of that exact size.
853
	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
854
	if !s.updateHeaderSent() {                      // No headers have been sent.
855
		if len(s.header) > 0 { // Send a separate header frame.
856
			if err := t.writeHeaderLocked(s); err != nil {
857
				s.hdrMu.Unlock()
858
				return err
859
			}
860
		} else { // Send a trailer only response.
861
			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
862
			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
863
		}
864
	}
865
	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
866
	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
867

868
	if p := st.Proto(); p != nil && len(p.Details) > 0 {
869
		stBytes, err := proto.Marshal(p)
870
		if err != nil {
871
			// TODO: return error instead, when callers are able to handle it.
872
			logger.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
873
		} else {
874
			headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
875
		}
876
	}
877

878
	// Attach the trailer metadata.
879
	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
880
	trailingHeader := &headerFrame{
881
		streamID:  s.id,
882
		hf:        headerFields,
883
		endStream: true,
884
		onWrite:   t.setResetPingStrikes,
885
	}
886
	s.hdrMu.Unlock()
887
	success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
888
	if !success {
889
		if err != nil {
890
			return err
891
		}
892
		t.closeStream(s, true, http2.ErrCodeInternal, false)
893
		return ErrHeaderListSizeLimitViolation
894
	}
895
	// Send a RST_STREAM after the trailers if the client has not already half-closed.
896
	rst := s.getState() == streamActive
897
	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
898
	if t.stats != nil {
899
		// Note: The trailer fields are compressed with hpack after this call returns.
900
		// No WireLength field is set here.
901
		t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
902
			Trailer: s.trailer.Copy(),
903
		})
904
	}
905
	return nil
906
}
907

908
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
909
// is returns if it fails (e.g., framing error, transport error).
910
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
911
	if !s.isHeaderSent() { // Headers haven't been written yet.
912
		if err := t.WriteHeader(s, nil); err != nil {
913
			if _, ok := err.(ConnectionError); ok {
914
				return err
915
			}
916
			// TODO(mmukhi, dfawley): Make sure this is the right code to return.
917
			return status.Errorf(codes.Internal, "transport: %v", err)
918
		}
919
	} else {
920
		// Writing headers checks for this condition.
921
		if s.getState() == streamDone {
922
			// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
923
			s.cancel()
924
			select {
925
			case <-t.done:
926
				return ErrConnClosing
927
			default:
928
			}
929
			return ContextErr(s.ctx.Err())
930
		}
931
	}
932
	df := &dataFrame{
933
		streamID:    s.id,
934
		h:           hdr,
935
		d:           data,
936
		onEachWrite: t.setResetPingStrikes,
937
	}
938
	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
939
		select {
940
		case <-t.done:
941
			return ErrConnClosing
942
		default:
943
		}
944
		return ContextErr(s.ctx.Err())
945
	}
946
	return t.controlBuf.put(df)
947
}
948

949
// keepalive running in a separate goroutine does the following:
950
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
951
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
952
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
953
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
954
// after an additional duration of keepalive.Timeout.
955
func (t *http2Server) keepalive() {
956
	p := &ping{}
957
	// True iff a ping has been sent, and no data has been received since then.
958
	outstandingPing := false
959
	// Amount of time remaining before which we should receive an ACK for the
960
	// last sent ping.
961
	kpTimeoutLeft := time.Duration(0)
962
	// Records the last value of t.lastRead before we go block on the timer.
963
	// This is required to check for read activity since then.
964
	prevNano := time.Now().UnixNano()
965
	// Initialize the different timers to their default values.
966
	idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
967
	ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
968
	kpTimer := time.NewTimer(t.kp.Time)
969
	defer func() {
970
		// We need to drain the underlying channel in these timers after a call
971
		// to Stop(), only if we are interested in resetting them. Clearly we
972
		// are not interested in resetting them here.
973
		idleTimer.Stop()
974
		ageTimer.Stop()
975
		kpTimer.Stop()
976
	}()
977

978
	for {
979
		select {
980
		case <-idleTimer.C:
981
			t.mu.Lock()
982
			idle := t.idle
983
			if idle.IsZero() { // The connection is non-idle.
984
				t.mu.Unlock()
985
				idleTimer.Reset(t.kp.MaxConnectionIdle)
986
				continue
987
			}
988
			val := t.kp.MaxConnectionIdle - time.Since(idle)
989
			t.mu.Unlock()
990
			if val <= 0 {
991
				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
992
				// Gracefully close the connection.
993
				t.drain(http2.ErrCodeNo, []byte{})
994
				return
995
			}
996
			idleTimer.Reset(val)
997
		case <-ageTimer.C:
998
			t.drain(http2.ErrCodeNo, []byte{})
999
			ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1000
			select {
1001
			case <-ageTimer.C:
1002
				// Close the connection after grace period.
1003
				if logger.V(logLevel) {
1004
					logger.Infof("transport: closing server transport due to maximum connection age.")
1005
				}
1006
				t.Close()
1007
			case <-t.done:
1008
			}
1009
			return
1010
		case <-kpTimer.C:
1011
			lastRead := atomic.LoadInt64(&t.lastRead)
1012
			if lastRead > prevNano {
1013
				// There has been read activity since the last time we were
1014
				// here. Setup the timer to fire at kp.Time seconds from
1015
				// lastRead time and continue.
1016
				outstandingPing = false
1017
				kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1018
				prevNano = lastRead
1019
				continue
1020
			}
1021
			if outstandingPing && kpTimeoutLeft <= 0 {
1022
				if logger.V(logLevel) {
1023
					logger.Infof("transport: closing server transport due to idleness.")
1024
				}
1025
				t.Close()
1026
				return
1027
			}
1028
			if !outstandingPing {
1029
				if channelz.IsOn() {
1030
					atomic.AddInt64(&t.czData.kpCount, 1)
1031
				}
1032
				t.controlBuf.put(p)
1033
				kpTimeoutLeft = t.kp.Timeout
1034
				outstandingPing = true
1035
			}
1036
			// The amount of time to sleep here is the minimum of kp.Time and
1037
			// timeoutLeft. This will ensure that we wait only for kp.Time
1038
			// before sending out the next ping (for cases where the ping is
1039
			// acked).
1040
			sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1041
			kpTimeoutLeft -= sleepDuration
1042
			kpTimer.Reset(sleepDuration)
1043
		case <-t.done:
1044
			return
1045
		}
1046
	}
1047
}
1048

1049
// Close starts shutting down the http2Server transport.
1050
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1051
// could cause some resource issue. Revisit this later.
1052
func (t *http2Server) Close() error {
1053
	t.mu.Lock()
1054
	if t.state == closing {
1055
		t.mu.Unlock()
1056
		return errors.New("transport: Close() was already called")
1057
	}
1058
	t.state = closing
1059
	streams := t.activeStreams
1060
	t.activeStreams = nil
1061
	t.mu.Unlock()
1062
	t.controlBuf.finish()
1063
	close(t.done)
1064
	err := t.conn.Close()
1065
	if channelz.IsOn() {
1066
		channelz.RemoveEntry(t.channelzID)
1067
	}
1068
	// Cancel all active streams.
1069
	for _, s := range streams {
1070
		s.cancel()
1071
	}
1072
	if t.stats != nil {
1073
		connEnd := &stats.ConnEnd{}
1074
		t.stats.HandleConn(t.ctx, connEnd)
1075
	}
1076
	return err
1077
}
1078

1079
// deleteStream deletes the stream s from transport's active streams.
1080
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1081
	// In case stream sending and receiving are invoked in separate
1082
	// goroutines (e.g., bi-directional streaming), cancel needs to be
1083
	// called to interrupt the potential blocking on other goroutines.
1084
	s.cancel()
1085

1086
	t.mu.Lock()
1087
	if _, ok := t.activeStreams[s.id]; ok {
1088
		delete(t.activeStreams, s.id)
1089
		if len(t.activeStreams) == 0 {
1090
			t.idle = time.Now()
1091
		}
1092
	}
1093
	t.mu.Unlock()
1094

1095
	if channelz.IsOn() {
1096
		if eosReceived {
1097
			atomic.AddInt64(&t.czData.streamsSucceeded, 1)
1098
		} else {
1099
			atomic.AddInt64(&t.czData.streamsFailed, 1)
1100
		}
1101
	}
1102
}
1103

1104
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
1105
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1106
	oldState := s.swapState(streamDone)
1107
	if oldState == streamDone {
1108
		// If the stream was already done, return.
1109
		return
1110
	}
1111

1112
	hdr.cleanup = &cleanupStream{
1113
		streamID: s.id,
1114
		rst:      rst,
1115
		rstCode:  rstCode,
1116
		onWrite: func() {
1117
			t.deleteStream(s, eosReceived)
1118
		},
1119
	}
1120
	t.controlBuf.put(hdr)
1121
}
1122

1123
// closeStream clears the footprint of a stream when the stream is not needed any more.
1124
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1125
	s.swapState(streamDone)
1126
	t.deleteStream(s, eosReceived)
1127

1128
	t.controlBuf.put(&cleanupStream{
1129
		streamID: s.id,
1130
		rst:      rst,
1131
		rstCode:  rstCode,
1132
		onWrite:  func() {},
1133
	})
1134
}
1135

1136
func (t *http2Server) RemoteAddr() net.Addr {
1137
	return t.remoteAddr
1138
}
1139

1140
func (t *http2Server) Drain() {
1141
	t.drain(http2.ErrCodeNo, []byte{})
1142
}
1143

1144
func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1145
	t.mu.Lock()
1146
	defer t.mu.Unlock()
1147
	if t.drainChan != nil {
1148
		return
1149
	}
1150
	t.drainChan = make(chan struct{})
1151
	t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1152
}
1153

1154
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1155

1156
// Handles outgoing GoAway and returns true if loopy needs to put itself
1157
// in draining mode.
1158
func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1159
	t.mu.Lock()
1160
	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
1161
		t.mu.Unlock()
1162
		// The transport is closing.
1163
		return false, ErrConnClosing
1164
	}
1165
	sid := t.maxStreamID
1166
	if !g.headsUp {
1167
		// Stop accepting more streams now.
1168
		t.state = draining
1169
		if len(t.activeStreams) == 0 {
1170
			g.closeConn = true
1171
		}
1172
		t.mu.Unlock()
1173
		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1174
			return false, err
1175
		}
1176
		if g.closeConn {
1177
			// Abruptly close the connection following the GoAway (via
1178
			// loopywriter).  But flush out what's inside the buffer first.
1179
			t.framer.writer.Flush()
1180
			return false, fmt.Errorf("transport: Connection closing")
1181
		}
1182
		return true, nil
1183
	}
1184
	t.mu.Unlock()
1185
	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1186
	// Follow that with a ping and wait for the ack to come back or a timer
1187
	// to expire. During this time accept new streams since they might have
1188
	// originated before the GoAway reaches the client.
1189
	// After getting the ack or timer expiration send out another GoAway this
1190
	// time with an ID of the max stream server intends to process.
1191
	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
1192
		return false, err
1193
	}
1194
	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1195
		return false, err
1196
	}
1197
	go func() {
1198
		timer := time.NewTimer(time.Minute)
1199
		defer timer.Stop()
1200
		select {
1201
		case <-t.drainChan:
1202
		case <-timer.C:
1203
		case <-t.done:
1204
			return
1205
		}
1206
		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1207
	}()
1208
	return false, nil
1209
}
1210

1211
func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
1212
	s := channelz.SocketInternalMetric{
1213
		StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted),
1214
		StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded),
1215
		StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed),
1216
		MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent),
1217
		MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv),
1218
		KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount),
1219
		LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
1220
		LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
1221
		LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
1222
		LocalFlowControlWindow:           int64(t.fc.getSize()),
1223
		SocketOptions:                    channelz.GetSocketOption(t.conn),
1224
		LocalAddr:                        t.localAddr,
1225
		RemoteAddr:                       t.remoteAddr,
1226
		// RemoteName :
1227
	}
1228
	if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
1229
		s.Security = au.GetSecurityValue()
1230
	}
1231
	s.RemoteFlowControlWindow = t.getOutFlowWindow()
1232
	return &s
1233
}
1234

1235
func (t *http2Server) IncrMsgSent() {
1236
	atomic.AddInt64(&t.czData.msgSent, 1)
1237
	atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
1238
}
1239

1240
func (t *http2Server) IncrMsgRecv() {
1241
	atomic.AddInt64(&t.czData.msgRecv, 1)
1242
	atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
1243
}
1244

1245
func (t *http2Server) getOutFlowWindow() int64 {
1246
	resp := make(chan uint32, 1)
1247
	timer := time.NewTimer(time.Second)
1248
	defer timer.Stop()
1249
	t.controlBuf.put(&outFlowControlSizeRequest{resp})
1250
	select {
1251
	case sz := <-resp:
1252
		return int64(sz)
1253
	case <-t.done:
1254
		return -1
1255
	case <-timer.C:
1256
		return -2
1257
	}
1258
}
1259

1260
func getJitter(v time.Duration) time.Duration {
1261
	if v == infinity {
1262
		return 0
1263
	}
1264
	// Generate a jitter between +/- 10% of the value.
1265
	r := int64(v / 10)
1266
	j := grpcrand.Int63n(2*r) - r
1267
	return time.Duration(j)
1268
}
1269

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.