cubefs

Форк
0
/x
/
writesched_random.go 
77 строк · 2.0 Кб
1
// Copyright 2014 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
package http2
6

7
import "math"
8

9
// NewRandomWriteScheduler constructs a WriteScheduler that ignores HTTP/2
10
// priorities. Control frames like SETTINGS and PING are written before DATA
11
// frames, but if no control frames are queued and multiple streams have queued
12
// HEADERS or DATA frames, Pop selects a ready stream arbitrarily.
13
func NewRandomWriteScheduler() WriteScheduler {
14
	return &randomWriteScheduler{sq: make(map[uint32]*writeQueue)}
15
}
16

17
type randomWriteScheduler struct {
18
	// zero are frames not associated with a specific stream.
19
	zero writeQueue
20

21
	// sq contains the stream-specific queues, keyed by stream ID.
22
	// When a stream is idle, closed, or emptied, it's deleted
23
	// from the map.
24
	sq map[uint32]*writeQueue
25

26
	// pool of empty queues for reuse.
27
	queuePool writeQueuePool
28
}
29

30
func (ws *randomWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
31
	// no-op: idle streams are not tracked
32
}
33

34
func (ws *randomWriteScheduler) CloseStream(streamID uint32) {
35
	q, ok := ws.sq[streamID]
36
	if !ok {
37
		return
38
	}
39
	delete(ws.sq, streamID)
40
	ws.queuePool.put(q)
41
}
42

43
func (ws *randomWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
44
	// no-op: priorities are ignored
45
}
46

47
func (ws *randomWriteScheduler) Push(wr FrameWriteRequest) {
48
	if wr.isControl() {
49
		ws.zero.push(wr)
50
		return
51
	}
52
	id := wr.StreamID()
53
	q, ok := ws.sq[id]
54
	if !ok {
55
		q = ws.queuePool.get()
56
		ws.sq[id] = q
57
	}
58
	q.push(wr)
59
}
60

61
func (ws *randomWriteScheduler) Pop() (FrameWriteRequest, bool) {
62
	// Control and RST_STREAM frames first.
63
	if !ws.zero.empty() {
64
		return ws.zero.shift(), true
65
	}
66
	// Iterate over all non-idle streams until finding one that can be consumed.
67
	for streamID, q := range ws.sq {
68
		if wr, ok := q.consume(math.MaxInt32); ok {
69
			if q.empty() {
70
				delete(ws.sq, streamID)
71
				ws.queuePool.put(q)
72
			}
73
			return wr, true
74
		}
75
	}
76
	return FrameWriteRequest{}, false
77
}
78

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.