cubefs

Форк
0
942 строки · 24.3 Кб
1
/*
2
 *
3
 * Copyright 2014 gRPC authors.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 *     http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 */
18

19
package transport
20

21
import (
22
	"bytes"
23
	"fmt"
24
	"runtime"
25
	"sync"
26
	"sync/atomic"
27

28
	"golang.org/x/net/http2"
29
	"golang.org/x/net/http2/hpack"
30
)
31

32
var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
33
	e.SetMaxDynamicTableSizeLimit(v)
34
}
35

36
type itemNode struct {
37
	it   interface{}
38
	next *itemNode
39
}
40

41
type itemList struct {
42
	head *itemNode
43
	tail *itemNode
44
}
45

46
func (il *itemList) enqueue(i interface{}) {
47
	n := &itemNode{it: i}
48
	if il.tail == nil {
49
		il.head, il.tail = n, n
50
		return
51
	}
52
	il.tail.next = n
53
	il.tail = n
54
}
55

56
// peek returns the first item in the list without removing it from the
57
// list.
58
func (il *itemList) peek() interface{} {
59
	return il.head.it
60
}
61

62
func (il *itemList) dequeue() interface{} {
63
	if il.head == nil {
64
		return nil
65
	}
66
	i := il.head.it
67
	il.head = il.head.next
68
	if il.head == nil {
69
		il.tail = nil
70
	}
71
	return i
72
}
73

74
func (il *itemList) dequeueAll() *itemNode {
75
	h := il.head
76
	il.head, il.tail = nil, nil
77
	return h
78
}
79

80
func (il *itemList) isEmpty() bool {
81
	return il.head == nil
82
}
83

84
// The following defines various control items which could flow through
85
// the control buffer of transport. They represent different aspects of
86
// control tasks, e.g., flow control, settings, streaming resetting, etc.
87

88
// maxQueuedTransportResponseFrames is the most queued "transport response"
89
// frames we will buffer before preventing new reads from occurring on the
90
// transport.  These are control frames sent in response to client requests,
91
// such as RST_STREAM due to bad headers or settings acks.
92
const maxQueuedTransportResponseFrames = 50
93

94
type cbItem interface {
95
	isTransportResponseFrame() bool
96
}
97

98
// registerStream is used to register an incoming stream with loopy writer.
99
type registerStream struct {
100
	streamID uint32
101
	wq       *writeQuota
102
}
103

104
func (*registerStream) isTransportResponseFrame() bool { return false }
105

106
// headerFrame is also used to register stream on the client-side.
107
type headerFrame struct {
108
	streamID   uint32
109
	hf         []hpack.HeaderField
110
	endStream  bool               // Valid on server side.
111
	initStream func(uint32) error // Used only on the client side.
112
	onWrite    func()
113
	wq         *writeQuota    // write quota for the stream created.
114
	cleanup    *cleanupStream // Valid on the server side.
115
	onOrphaned func(error)    // Valid on client-side
116
}
117

118
func (h *headerFrame) isTransportResponseFrame() bool {
119
	return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
120
}
121

122
type cleanupStream struct {
123
	streamID uint32
124
	rst      bool
125
	rstCode  http2.ErrCode
126
	onWrite  func()
127
}
128

129
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
130

131
type dataFrame struct {
132
	streamID  uint32
133
	endStream bool
134
	h         []byte
135
	d         []byte
136
	// onEachWrite is called every time
137
	// a part of d is written out.
138
	onEachWrite func()
139
}
140

141
func (*dataFrame) isTransportResponseFrame() bool { return false }
142

143
type incomingWindowUpdate struct {
144
	streamID  uint32
145
	increment uint32
146
}
147

148
func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
149

150
type outgoingWindowUpdate struct {
151
	streamID  uint32
152
	increment uint32
153
}
154

155
func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
156
	return false // window updates are throttled by thresholds
157
}
158

159
type incomingSettings struct {
160
	ss []http2.Setting
161
}
162

163
func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
164

165
type outgoingSettings struct {
166
	ss []http2.Setting
167
}
168

169
func (*outgoingSettings) isTransportResponseFrame() bool { return false }
170

171
type incomingGoAway struct {
172
}
173

174
func (*incomingGoAway) isTransportResponseFrame() bool { return false }
175

176
type goAway struct {
177
	code      http2.ErrCode
178
	debugData []byte
179
	headsUp   bool
180
	closeConn bool
181
}
182

183
func (*goAway) isTransportResponseFrame() bool { return false }
184

185
type ping struct {
186
	ack  bool
187
	data [8]byte
188
}
189

190
func (*ping) isTransportResponseFrame() bool { return true }
191

192
type outFlowControlSizeRequest struct {
193
	resp chan uint32
194
}
195

196
func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
197

198
type outStreamState int
199

200
const (
201
	active outStreamState = iota
202
	empty
203
	waitingOnStreamQuota
204
)
205

206
type outStream struct {
207
	id               uint32
208
	state            outStreamState
209
	itl              *itemList
210
	bytesOutStanding int
211
	wq               *writeQuota
212

213
	next *outStream
214
	prev *outStream
215
}
216

217
func (s *outStream) deleteSelf() {
218
	if s.prev != nil {
219
		s.prev.next = s.next
220
	}
221
	if s.next != nil {
222
		s.next.prev = s.prev
223
	}
224
	s.next, s.prev = nil, nil
225
}
226

227
type outStreamList struct {
228
	// Following are sentinel objects that mark the
229
	// beginning and end of the list. They do not
230
	// contain any item lists. All valid objects are
231
	// inserted in between them.
232
	// This is needed so that an outStream object can
233
	// deleteSelf() in O(1) time without knowing which
234
	// list it belongs to.
235
	head *outStream
236
	tail *outStream
237
}
238

239
func newOutStreamList() *outStreamList {
240
	head, tail := new(outStream), new(outStream)
241
	head.next = tail
242
	tail.prev = head
243
	return &outStreamList{
244
		head: head,
245
		tail: tail,
246
	}
247
}
248

249
func (l *outStreamList) enqueue(s *outStream) {
250
	e := l.tail.prev
251
	e.next = s
252
	s.prev = e
253
	s.next = l.tail
254
	l.tail.prev = s
255
}
256

257
// remove from the beginning of the list.
258
func (l *outStreamList) dequeue() *outStream {
259
	b := l.head.next
260
	if b == l.tail {
261
		return nil
262
	}
263
	b.deleteSelf()
264
	return b
265
}
266

267
// controlBuffer is a way to pass information to loopy.
268
// Information is passed as specific struct types called control frames.
269
// A control frame not only represents data, messages or headers to be sent out
270
// but can also be used to instruct loopy to update its internal state.
271
// It shouldn't be confused with an HTTP2 frame, although some of the control frames
272
// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
273
type controlBuffer struct {
274
	ch              chan struct{}
275
	done            <-chan struct{}
276
	mu              sync.Mutex
277
	consumerWaiting bool
278
	list            *itemList
279
	err             error
280

281
	// transportResponseFrames counts the number of queued items that represent
282
	// the response of an action initiated by the peer.  trfChan is created
283
	// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
284
	// closed and nilled when transportResponseFrames drops below the
285
	// threshold.  Both fields are protected by mu.
286
	transportResponseFrames int
287
	trfChan                 atomic.Value // *chan struct{}
288
}
289

290
func newControlBuffer(done <-chan struct{}) *controlBuffer {
291
	return &controlBuffer{
292
		ch:   make(chan struct{}, 1),
293
		list: &itemList{},
294
		done: done,
295
	}
296
}
297

298
// throttle blocks if there are too many incomingSettings/cleanupStreams in the
299
// controlbuf.
300
func (c *controlBuffer) throttle() {
301
	ch, _ := c.trfChan.Load().(*chan struct{})
302
	if ch != nil {
303
		select {
304
		case <-*ch:
305
		case <-c.done:
306
		}
307
	}
308
}
309

310
func (c *controlBuffer) put(it cbItem) error {
311
	_, err := c.executeAndPut(nil, it)
312
	return err
313
}
314

315
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
316
	var wakeUp bool
317
	c.mu.Lock()
318
	if c.err != nil {
319
		c.mu.Unlock()
320
		return false, c.err
321
	}
322
	if f != nil {
323
		if !f(it) { // f wasn't successful
324
			c.mu.Unlock()
325
			return false, nil
326
		}
327
	}
328
	if c.consumerWaiting {
329
		wakeUp = true
330
		c.consumerWaiting = false
331
	}
332
	c.list.enqueue(it)
333
	if it.isTransportResponseFrame() {
334
		c.transportResponseFrames++
335
		if c.transportResponseFrames == maxQueuedTransportResponseFrames {
336
			// We are adding the frame that puts us over the threshold; create
337
			// a throttling channel.
338
			ch := make(chan struct{})
339
			c.trfChan.Store(&ch)
340
		}
341
	}
342
	c.mu.Unlock()
343
	if wakeUp {
344
		select {
345
		case c.ch <- struct{}{}:
346
		default:
347
		}
348
	}
349
	return true, nil
350
}
351

352
// Note argument f should never be nil.
353
func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
354
	c.mu.Lock()
355
	if c.err != nil {
356
		c.mu.Unlock()
357
		return false, c.err
358
	}
359
	if !f(it) { // f wasn't successful
360
		c.mu.Unlock()
361
		return false, nil
362
	}
363
	c.mu.Unlock()
364
	return true, nil
365
}
366

367
func (c *controlBuffer) get(block bool) (interface{}, error) {
368
	for {
369
		c.mu.Lock()
370
		if c.err != nil {
371
			c.mu.Unlock()
372
			return nil, c.err
373
		}
374
		if !c.list.isEmpty() {
375
			h := c.list.dequeue().(cbItem)
376
			if h.isTransportResponseFrame() {
377
				if c.transportResponseFrames == maxQueuedTransportResponseFrames {
378
					// We are removing the frame that put us over the
379
					// threshold; close and clear the throttling channel.
380
					ch := c.trfChan.Load().(*chan struct{})
381
					close(*ch)
382
					c.trfChan.Store((*chan struct{})(nil))
383
				}
384
				c.transportResponseFrames--
385
			}
386
			c.mu.Unlock()
387
			return h, nil
388
		}
389
		if !block {
390
			c.mu.Unlock()
391
			return nil, nil
392
		}
393
		c.consumerWaiting = true
394
		c.mu.Unlock()
395
		select {
396
		case <-c.ch:
397
		case <-c.done:
398
			c.finish()
399
			return nil, ErrConnClosing
400
		}
401
	}
402
}
403

404
func (c *controlBuffer) finish() {
405
	c.mu.Lock()
406
	if c.err != nil {
407
		c.mu.Unlock()
408
		return
409
	}
410
	c.err = ErrConnClosing
411
	// There may be headers for streams in the control buffer.
412
	// These streams need to be cleaned out since the transport
413
	// is still not aware of these yet.
414
	for head := c.list.dequeueAll(); head != nil; head = head.next {
415
		hdr, ok := head.it.(*headerFrame)
416
		if !ok {
417
			continue
418
		}
419
		if hdr.onOrphaned != nil { // It will be nil on the server-side.
420
			hdr.onOrphaned(ErrConnClosing)
421
		}
422
	}
423
	c.mu.Unlock()
424
}
425

426
type side int
427

428
const (
429
	clientSide side = iota
430
	serverSide
431
)
432

433
// Loopy receives frames from the control buffer.
434
// Each frame is handled individually; most of the work done by loopy goes
435
// into handling data frames. Loopy maintains a queue of active streams, and each
436
// stream maintains a queue of data frames; as loopy receives data frames
437
// it gets added to the queue of the relevant stream.
438
// Loopy goes over this list of active streams by processing one node every iteration,
439
// thereby closely resemebling to a round-robin scheduling over all streams. While
440
// processing a stream, loopy writes out data bytes from this stream capped by the min
441
// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
442
type loopyWriter struct {
443
	side      side
444
	cbuf      *controlBuffer
445
	sendQuota uint32
446
	oiws      uint32 // outbound initial window size.
447
	// estdStreams is map of all established streams that are not cleaned-up yet.
448
	// On client-side, this is all streams whose headers were sent out.
449
	// On server-side, this is all streams whose headers were received.
450
	estdStreams map[uint32]*outStream // Established streams.
451
	// activeStreams is a linked-list of all streams that have data to send and some
452
	// stream-level flow control quota.
453
	// Each of these streams internally have a list of data items(and perhaps trailers
454
	// on the server-side) to be sent out.
455
	activeStreams *outStreamList
456
	framer        *framer
457
	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
458
	hEnc          *hpack.Encoder // HPACK encoder.
459
	bdpEst        *bdpEstimator
460
	draining      bool
461

462
	// Side-specific handlers
463
	ssGoAwayHandler func(*goAway) (bool, error)
464
}
465

466
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
467
	var buf bytes.Buffer
468
	l := &loopyWriter{
469
		side:          s,
470
		cbuf:          cbuf,
471
		sendQuota:     defaultWindowSize,
472
		oiws:          defaultWindowSize,
473
		estdStreams:   make(map[uint32]*outStream),
474
		activeStreams: newOutStreamList(),
475
		framer:        fr,
476
		hBuf:          &buf,
477
		hEnc:          hpack.NewEncoder(&buf),
478
		bdpEst:        bdpEst,
479
	}
480
	return l
481
}
482

483
const minBatchSize = 1000
484

485
// run should be run in a separate goroutine.
486
// It reads control frames from controlBuf and processes them by:
487
// 1. Updating loopy's internal state, or/and
488
// 2. Writing out HTTP2 frames on the wire.
489
//
490
// Loopy keeps all active streams with data to send in a linked-list.
491
// All streams in the activeStreams linked-list must have both:
492
// 1. Data to send, and
493
// 2. Stream level flow control quota available.
494
//
495
// In each iteration of run loop, other than processing the incoming control
496
// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
497
// This results in writing of HTTP2 frames into an underlying write buffer.
498
// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
499
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
500
// if the batch size is too low to give stream goroutines a chance to fill it up.
501
func (l *loopyWriter) run() (err error) {
502
	defer func() {
503
		if err == ErrConnClosing {
504
			// Don't log ErrConnClosing as error since it happens
505
			// 1. When the connection is closed by some other known issue.
506
			// 2. User closed the connection.
507
			// 3. A graceful close of connection.
508
			if logger.V(logLevel) {
509
				logger.Infof("transport: loopyWriter.run returning. %v", err)
510
			}
511
			err = nil
512
		}
513
	}()
514
	for {
515
		it, err := l.cbuf.get(true)
516
		if err != nil {
517
			return err
518
		}
519
		if err = l.handle(it); err != nil {
520
			return err
521
		}
522
		if _, err = l.processData(); err != nil {
523
			return err
524
		}
525
		gosched := true
526
	hasdata:
527
		for {
528
			it, err := l.cbuf.get(false)
529
			if err != nil {
530
				return err
531
			}
532
			if it != nil {
533
				if err = l.handle(it); err != nil {
534
					return err
535
				}
536
				if _, err = l.processData(); err != nil {
537
					return err
538
				}
539
				continue hasdata
540
			}
541
			isEmpty, err := l.processData()
542
			if err != nil {
543
				return err
544
			}
545
			if !isEmpty {
546
				continue hasdata
547
			}
548
			if gosched {
549
				gosched = false
550
				if l.framer.writer.offset < minBatchSize {
551
					runtime.Gosched()
552
					continue hasdata
553
				}
554
			}
555
			l.framer.writer.Flush()
556
			break hasdata
557

558
		}
559
	}
560
}
561

562
func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
563
	return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
564
}
565

566
func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
567
	// Otherwise update the quota.
568
	if w.streamID == 0 {
569
		l.sendQuota += w.increment
570
		return nil
571
	}
572
	// Find the stream and update it.
573
	if str, ok := l.estdStreams[w.streamID]; ok {
574
		str.bytesOutStanding -= int(w.increment)
575
		if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
576
			str.state = active
577
			l.activeStreams.enqueue(str)
578
			return nil
579
		}
580
	}
581
	return nil
582
}
583

584
func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
585
	return l.framer.fr.WriteSettings(s.ss...)
586
}
587

588
func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
589
	if err := l.applySettings(s.ss); err != nil {
590
		return err
591
	}
592
	return l.framer.fr.WriteSettingsAck()
593
}
594

595
func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
596
	str := &outStream{
597
		id:    h.streamID,
598
		state: empty,
599
		itl:   &itemList{},
600
		wq:    h.wq,
601
	}
602
	l.estdStreams[h.streamID] = str
603
	return nil
604
}
605

606
func (l *loopyWriter) headerHandler(h *headerFrame) error {
607
	if l.side == serverSide {
608
		str, ok := l.estdStreams[h.streamID]
609
		if !ok {
610
			if logger.V(logLevel) {
611
				logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
612
			}
613
			return nil
614
		}
615
		// Case 1.A: Server is responding back with headers.
616
		if !h.endStream {
617
			return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
618
		}
619
		// else:  Case 1.B: Server wants to close stream.
620

621
		if str.state != empty { // either active or waiting on stream quota.
622
			// add it str's list of items.
623
			str.itl.enqueue(h)
624
			return nil
625
		}
626
		if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
627
			return err
628
		}
629
		return l.cleanupStreamHandler(h.cleanup)
630
	}
631
	// Case 2: Client wants to originate stream.
632
	str := &outStream{
633
		id:    h.streamID,
634
		state: empty,
635
		itl:   &itemList{},
636
		wq:    h.wq,
637
	}
638
	str.itl.enqueue(h)
639
	return l.originateStream(str)
640
}
641

642
func (l *loopyWriter) originateStream(str *outStream) error {
643
	hdr := str.itl.dequeue().(*headerFrame)
644
	if err := hdr.initStream(str.id); err != nil {
645
		if err == ErrConnClosing {
646
			return err
647
		}
648
		// Other errors(errStreamDrain) need not close transport.
649
		return nil
650
	}
651
	if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
652
		return err
653
	}
654
	l.estdStreams[str.id] = str
655
	return nil
656
}
657

658
func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
659
	if onWrite != nil {
660
		onWrite()
661
	}
662
	l.hBuf.Reset()
663
	for _, f := range hf {
664
		if err := l.hEnc.WriteField(f); err != nil {
665
			if logger.V(logLevel) {
666
				logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
667
			}
668
		}
669
	}
670
	var (
671
		err               error
672
		endHeaders, first bool
673
	)
674
	first = true
675
	for !endHeaders {
676
		size := l.hBuf.Len()
677
		if size > http2MaxFrameLen {
678
			size = http2MaxFrameLen
679
		} else {
680
			endHeaders = true
681
		}
682
		if first {
683
			first = false
684
			err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
685
				StreamID:      streamID,
686
				BlockFragment: l.hBuf.Next(size),
687
				EndStream:     endStream,
688
				EndHeaders:    endHeaders,
689
			})
690
		} else {
691
			err = l.framer.fr.WriteContinuation(
692
				streamID,
693
				endHeaders,
694
				l.hBuf.Next(size),
695
			)
696
		}
697
		if err != nil {
698
			return err
699
		}
700
	}
701
	return nil
702
}
703

704
func (l *loopyWriter) preprocessData(df *dataFrame) error {
705
	str, ok := l.estdStreams[df.streamID]
706
	if !ok {
707
		return nil
708
	}
709
	// If we got data for a stream it means that
710
	// stream was originated and the headers were sent out.
711
	str.itl.enqueue(df)
712
	if str.state == empty {
713
		str.state = active
714
		l.activeStreams.enqueue(str)
715
	}
716
	return nil
717
}
718

719
func (l *loopyWriter) pingHandler(p *ping) error {
720
	if !p.ack {
721
		l.bdpEst.timesnap(p.data)
722
	}
723
	return l.framer.fr.WritePing(p.ack, p.data)
724

725
}
726

727
func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
728
	o.resp <- l.sendQuota
729
	return nil
730
}
731

732
func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
733
	c.onWrite()
734
	if str, ok := l.estdStreams[c.streamID]; ok {
735
		// On the server side it could be a trailers-only response or
736
		// a RST_STREAM before stream initialization thus the stream might
737
		// not be established yet.
738
		delete(l.estdStreams, c.streamID)
739
		str.deleteSelf()
740
	}
741
	if c.rst { // If RST_STREAM needs to be sent.
742
		if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
743
			return err
744
		}
745
	}
746
	if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
747
		return ErrConnClosing
748
	}
749
	return nil
750
}
751

752
func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
753
	if l.side == clientSide {
754
		l.draining = true
755
		if len(l.estdStreams) == 0 {
756
			return ErrConnClosing
757
		}
758
	}
759
	return nil
760
}
761

762
func (l *loopyWriter) goAwayHandler(g *goAway) error {
763
	// Handling of outgoing GoAway is very specific to side.
764
	if l.ssGoAwayHandler != nil {
765
		draining, err := l.ssGoAwayHandler(g)
766
		if err != nil {
767
			return err
768
		}
769
		l.draining = draining
770
	}
771
	return nil
772
}
773

774
func (l *loopyWriter) handle(i interface{}) error {
775
	switch i := i.(type) {
776
	case *incomingWindowUpdate:
777
		return l.incomingWindowUpdateHandler(i)
778
	case *outgoingWindowUpdate:
779
		return l.outgoingWindowUpdateHandler(i)
780
	case *incomingSettings:
781
		return l.incomingSettingsHandler(i)
782
	case *outgoingSettings:
783
		return l.outgoingSettingsHandler(i)
784
	case *headerFrame:
785
		return l.headerHandler(i)
786
	case *registerStream:
787
		return l.registerStreamHandler(i)
788
	case *cleanupStream:
789
		return l.cleanupStreamHandler(i)
790
	case *incomingGoAway:
791
		return l.incomingGoAwayHandler(i)
792
	case *dataFrame:
793
		return l.preprocessData(i)
794
	case *ping:
795
		return l.pingHandler(i)
796
	case *goAway:
797
		return l.goAwayHandler(i)
798
	case *outFlowControlSizeRequest:
799
		return l.outFlowControlSizeRequestHandler(i)
800
	default:
801
		return fmt.Errorf("transport: unknown control message type %T", i)
802
	}
803
}
804

805
func (l *loopyWriter) applySettings(ss []http2.Setting) error {
806
	for _, s := range ss {
807
		switch s.ID {
808
		case http2.SettingInitialWindowSize:
809
			o := l.oiws
810
			l.oiws = s.Val
811
			if o < l.oiws {
812
				// If the new limit is greater make all depleted streams active.
813
				for _, stream := range l.estdStreams {
814
					if stream.state == waitingOnStreamQuota {
815
						stream.state = active
816
						l.activeStreams.enqueue(stream)
817
					}
818
				}
819
			}
820
		case http2.SettingHeaderTableSize:
821
			updateHeaderTblSize(l.hEnc, s.Val)
822
		}
823
	}
824
	return nil
825
}
826

827
// processData removes the first stream from active streams, writes out at most 16KB
828
// of its data and then puts it at the end of activeStreams if there's still more data
829
// to be sent and stream has some stream-level flow control.
830
func (l *loopyWriter) processData() (bool, error) {
831
	if l.sendQuota == 0 {
832
		return true, nil
833
	}
834
	str := l.activeStreams.dequeue() // Remove the first stream.
835
	if str == nil {
836
		return true, nil
837
	}
838
	dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
839
	// A data item is represented by a dataFrame, since it later translates into
840
	// multiple HTTP2 data frames.
841
	// Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
842
	// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
843
	// maximum possilbe HTTP2 frame size.
844

845
	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
846
		// Client sends out empty data frame with endStream = true
847
		if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
848
			return false, err
849
		}
850
		str.itl.dequeue() // remove the empty data item from stream
851
		if str.itl.isEmpty() {
852
			str.state = empty
853
		} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
854
			if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
855
				return false, err
856
			}
857
			if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
858
				return false, nil
859
			}
860
		} else {
861
			l.activeStreams.enqueue(str)
862
		}
863
		return false, nil
864
	}
865
	var (
866
		buf []byte
867
	)
868
	// Figure out the maximum size we can send
869
	maxSize := http2MaxFrameLen
870
	if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
871
		str.state = waitingOnStreamQuota
872
		return false, nil
873
	} else if maxSize > strQuota {
874
		maxSize = strQuota
875
	}
876
	if maxSize > int(l.sendQuota) { // connection-level flow control.
877
		maxSize = int(l.sendQuota)
878
	}
879
	// Compute how much of the header and data we can send within quota and max frame length
880
	hSize := min(maxSize, len(dataItem.h))
881
	dSize := min(maxSize-hSize, len(dataItem.d))
882
	if hSize != 0 {
883
		if dSize == 0 {
884
			buf = dataItem.h
885
		} else {
886
			// We can add some data to grpc message header to distribute bytes more equally across frames.
887
			// Copy on the stack to avoid generating garbage
888
			var localBuf [http2MaxFrameLen]byte
889
			copy(localBuf[:hSize], dataItem.h)
890
			copy(localBuf[hSize:], dataItem.d[:dSize])
891
			buf = localBuf[:hSize+dSize]
892
		}
893
	} else {
894
		buf = dataItem.d
895
	}
896

897
	size := hSize + dSize
898

899
	// Now that outgoing flow controls are checked we can replenish str's write quota
900
	str.wq.replenish(size)
901
	var endStream bool
902
	// If this is the last data message on this stream and all of it can be written in this iteration.
903
	if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
904
		endStream = true
905
	}
906
	if dataItem.onEachWrite != nil {
907
		dataItem.onEachWrite()
908
	}
909
	if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
910
		return false, err
911
	}
912
	str.bytesOutStanding += size
913
	l.sendQuota -= uint32(size)
914
	dataItem.h = dataItem.h[hSize:]
915
	dataItem.d = dataItem.d[dSize:]
916

917
	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
918
		str.itl.dequeue()
919
	}
920
	if str.itl.isEmpty() {
921
		str.state = empty
922
	} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
923
		if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
924
			return false, err
925
		}
926
		if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
927
			return false, err
928
		}
929
	} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
930
		str.state = waitingOnStreamQuota
931
	} else { // Otherwise add it back to the list of active streams.
932
		l.activeStreams.enqueue(str)
933
	}
934
	return false, nil
935
}
936

937
func min(a, b int) int {
938
	if a < b {
939
		return a
940
	}
941
	return b
942
}
943

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.