1
// Copyright 2016 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
13
// RFC 7540, Section 5.3.5: the default weight is 16.
14
const priorityDefaultWeight = 15 // 16 = 15 + 1
16
// PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
17
type PriorityWriteSchedulerConfig struct {
18
// MaxClosedNodesInTree controls the maximum number of closed streams to
19
// retain in the priority tree. Setting this to zero saves a small amount
20
// of memory at the cost of performance.
22
// See RFC 7540, Section 5.3.4:
23
// "It is possible for a stream to become closed while prioritization
24
// information ... is in transit. ... This potentially creates suboptimal
25
// prioritization, since the stream could be given a priority that is
26
// different from what is intended. To avoid these problems, an endpoint
27
// SHOULD retain stream prioritization state for a period after streams
28
// become closed. The longer state is retained, the lower the chance that
29
// streams are assigned incorrect or default priority values."
30
MaxClosedNodesInTree int
32
// MaxIdleNodesInTree controls the maximum number of idle streams to
33
// retain in the priority tree. Setting this to zero saves a small amount
34
// of memory at the cost of performance.
36
// See RFC 7540, Section 5.3.4:
37
// Similarly, streams that are in the "idle" state can be assigned
38
// priority or become a parent of other streams. This allows for the
39
// creation of a grouping node in the dependency tree, which enables
40
// more flexible expressions of priority. Idle streams begin with a
41
// default priority (Section 5.3.5).
42
MaxIdleNodesInTree int
44
// ThrottleOutOfOrderWrites enables write throttling to help ensure that
45
// data is delivered in priority order. This works around a race where
46
// stream B depends on stream A and both streams are about to call Write
47
// to queue DATA frames. If B wins the race, a naive scheduler would eagerly
48
// write as much data from B as possible, but this is suboptimal because A
49
// is a higher-priority stream. With throttling enabled, we write a small
50
// amount of data from B to minimize the amount of bandwidth that B can
52
ThrottleOutOfOrderWrites bool
55
// NewPriorityWriteScheduler constructs a WriteScheduler that schedules
56
// frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3.
57
// If cfg is nil, default options are used.
58
func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
60
// For justification of these defaults, see:
61
// https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
62
cfg = &PriorityWriteSchedulerConfig{
63
MaxClosedNodesInTree: 10,
64
MaxIdleNodesInTree: 10,
65
ThrottleOutOfOrderWrites: false,
69
ws := &priorityWriteScheduler{
70
nodes: make(map[uint32]*priorityNode),
71
maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
72
maxIdleNodesInTree: cfg.MaxIdleNodesInTree,
73
enableWriteThrottle: cfg.ThrottleOutOfOrderWrites,
75
ws.nodes[0] = &ws.root
76
if cfg.ThrottleOutOfOrderWrites {
77
ws.writeThrottleLimit = 1024
79
ws.writeThrottleLimit = math.MaxInt32
84
type priorityNodeState int
87
priorityNodeOpen priorityNodeState = iota
92
// priorityNode is a node in an HTTP/2 priority tree.
93
// Each node is associated with a single stream ID.
94
// See RFC 7540, Section 5.3.
95
type priorityNode struct {
96
q writeQueue // queue of pending frames to write
97
id uint32 // id of the stream, or 0 for the root of the tree
98
weight uint8 // the actual weight is weight+1, so the value is in [1,256]
99
state priorityNodeState // open | closed | idle
100
bytes int64 // number of bytes written by this node, or 0 if closed
101
subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree
103
// These links form the priority tree.
105
kids *priorityNode // start of the kids list
106
prev, next *priorityNode // doubly-linked list of siblings
109
func (n *priorityNode) setParent(parent *priorityNode) {
111
panic("setParent to self")
113
if n.parent == parent {
116
// Unlink from current parent.
117
if parent := n.parent; parent != nil {
127
// Link to new parent.
128
// If parent=nil, remove n from the tree.
129
// Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
144
func (n *priorityNode) addBytes(b int64) {
146
for ; n != nil; n = n.parent {
151
// walkReadyInOrder iterates over the tree in priority order, calling f for each node
152
// with a non-empty write queue. When f returns true, this function returns true and the
153
// walk halts. tmp is used as scratch space for sorting.
155
// f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
156
// if any ancestor p of n is still open (ignoring the root node).
157
func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f func(*priorityNode, bool) bool) bool {
158
if !n.q.empty() && f(n, openParent) {
165
// Don't consider the root "open" when updating openParent since
166
// we can't send data frames on the root stream (only control frames).
168
openParent = openParent || (n.state == priorityNodeOpen)
171
// Common case: only one kid or all kids have the same weight.
172
// Some clients don't use weights; other clients (like web browsers)
173
// use mostly-linear priority trees.
176
for k := n.kids.next; k != nil; k = k.next {
183
for k := n.kids; k != nil; k = k.next {
184
if k.walkReadyInOrder(openParent, tmp, f) {
191
// Uncommon case: sort the child nodes. We remove the kids from the parent,
192
// then re-insert after sorting so we can reuse tmp for future sort calls.
195
*tmp = append(*tmp, n.kids)
196
n.kids.setParent(nil)
198
sort.Sort(sortPriorityNodeSiblings(*tmp))
199
for i := len(*tmp) - 1; i >= 0; i-- {
200
(*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
202
for k := n.kids; k != nil; k = k.next {
203
if k.walkReadyInOrder(openParent, tmp, f) {
210
type sortPriorityNodeSiblings []*priorityNode
212
func (z sortPriorityNodeSiblings) Len() int { return len(z) }
213
func (z sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
214
func (z sortPriorityNodeSiblings) Less(i, k int) bool {
215
// Prefer the subtree that has sent fewer bytes relative to its weight.
216
// See sections 5.3.2 and 5.3.4.
217
wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
218
wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes)
219
if bi == 0 && bk == 0 {
225
return bi/bk <= wi/wk
228
type priorityWriteScheduler struct {
229
// root is the root of the priority tree, where root.id = 0.
230
// The root queues control frames that are not associated with any stream.
233
// nodes maps stream ids to priority tree nodes.
234
nodes map[uint32]*priorityNode
236
// maxID is the maximum stream id in nodes.
239
// lists of nodes that have been closed or are idle, but are kept in
240
// the tree for improved prioritization. When the lengths exceed either
241
// maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
242
closedNodes, idleNodes []*priorityNode
245
maxClosedNodesInTree int
246
maxIdleNodesInTree int
247
writeThrottleLimit int32
248
enableWriteThrottle bool
250
// tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
253
// pool of empty queues for reuse.
254
queuePool writeQueuePool
257
func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
258
// The stream may be currently idle but cannot be opened or closed.
259
if curr := ws.nodes[streamID]; curr != nil {
260
if curr.state != priorityNodeIdle {
261
panic(fmt.Sprintf("stream %d already opened", streamID))
263
curr.state = priorityNodeOpen
267
// RFC 7540, Section 5.3.5:
268
// "All streams are initially assigned a non-exclusive dependency on stream 0x0.
269
// Pushed streams initially depend on their associated stream. In both cases,
270
// streams are assigned a default weight of 16."
271
parent := ws.nodes[options.PusherID]
276
q: *ws.queuePool.get(),
278
weight: priorityDefaultWeight,
279
state: priorityNodeOpen,
282
ws.nodes[streamID] = n
283
if streamID > ws.maxID {
288
func (ws *priorityWriteScheduler) CloseStream(streamID uint32) {
290
panic("violation of WriteScheduler interface: cannot close stream 0")
292
if ws.nodes[streamID] == nil {
293
panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
295
if ws.nodes[streamID].state != priorityNodeOpen {
296
panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
299
n := ws.nodes[streamID]
300
n.state = priorityNodeClosed
306
if ws.maxClosedNodesInTree > 0 {
307
ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
313
func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
315
panic("adjustPriority on root")
318
// If streamID does not exist, there are two cases:
319
// - A closed stream that has been removed (this will have ID <= maxID)
320
// - An idle stream that is being used for "grouping" (this will have ID > maxID)
321
n := ws.nodes[streamID]
323
if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
328
q: *ws.queuePool.get(),
330
weight: priorityDefaultWeight,
331
state: priorityNodeIdle,
333
n.setParent(&ws.root)
334
ws.nodes[streamID] = n
335
ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
338
// Section 5.3.1: A dependency on a stream that is not currently in the tree
339
// results in that stream being given a default priority (Section 5.3.5).
340
parent := ws.nodes[priority.StreamDep]
342
n.setParent(&ws.root)
343
n.weight = priorityDefaultWeight
347
// Ignore if the client tries to make a node its own parent.
353
// "If a stream is made dependent on one of its own dependencies, the
354
// formerly dependent stream is first moved to be dependent on the
355
// reprioritized stream's previous parent. The moved dependency retains
358
// That is: if parent depends on n, move parent to depend on n.parent.
359
for x := parent.parent; x != nil; x = x.parent {
361
parent.setParent(n.parent)
366
// Section 5.3.3: The exclusive flag causes the stream to become the sole
367
// dependency of its parent stream, causing other dependencies to become
368
// dependent on the exclusive stream.
369
if priority.Exclusive {
381
n.weight = priority.Weight
384
func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
392
// id is an idle or closed stream. wr should not be a HEADERS or
393
// DATA frame. In other case, we push wr onto the root, rather
394
// than creating a new priorityNode.
395
if wr.DataSize() > 0 {
396
panic("add DATA on non-open stream")
404
func (ws *priorityWriteScheduler) Pop() (wr FrameWriteRequest, ok bool) {
405
ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNode, openParent bool) bool {
406
limit := int32(math.MaxInt32)
408
limit = ws.writeThrottleLimit
410
wr, ok = n.q.consume(limit)
414
n.addBytes(int64(wr.DataSize()))
415
// If B depends on A and B continuously has data available but A
416
// does not, gradually increase the throttling limit to allow B to
417
// steal more and more bandwidth from A.
419
ws.writeThrottleLimit += 1024
420
if ws.writeThrottleLimit < 0 {
421
ws.writeThrottleLimit = math.MaxInt32
423
} else if ws.enableWriteThrottle {
424
ws.writeThrottleLimit = 1024
431
func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, maxSize int, n *priorityNode) {
435
if len(*list) == maxSize {
436
// Remove the oldest node, then shift left.
437
ws.removeNode((*list)[0])
440
*list = (*list)[:len(x)]
442
*list = append(*list, n)
445
func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
446
for k := n.kids; k != nil; k = k.next {
447
k.setParent(n.parent)
450
delete(ws.nodes, n.id)