podman

Форк
0
/x
/
writesched_priority.go 
451 строка · 13.3 Кб
1
// Copyright 2016 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
package http2
6

7
import (
8
	"fmt"
9
	"math"
10
	"sort"
11
)
12

13
// RFC 7540, Section 5.3.5: the default weight is 16.
14
const priorityDefaultWeight = 15 // 16 = 15 + 1
15

16
// PriorityWriteSchedulerConfig configures a priorityWriteScheduler.
17
type PriorityWriteSchedulerConfig struct {
18
	// MaxClosedNodesInTree controls the maximum number of closed streams to
19
	// retain in the priority tree. Setting this to zero saves a small amount
20
	// of memory at the cost of performance.
21
	//
22
	// See RFC 7540, Section 5.3.4:
23
	//   "It is possible for a stream to become closed while prioritization
24
	//   information ... is in transit. ... This potentially creates suboptimal
25
	//   prioritization, since the stream could be given a priority that is
26
	//   different from what is intended. To avoid these problems, an endpoint
27
	//   SHOULD retain stream prioritization state for a period after streams
28
	//   become closed. The longer state is retained, the lower the chance that
29
	//   streams are assigned incorrect or default priority values."
30
	MaxClosedNodesInTree int
31

32
	// MaxIdleNodesInTree controls the maximum number of idle streams to
33
	// retain in the priority tree. Setting this to zero saves a small amount
34
	// of memory at the cost of performance.
35
	//
36
	// See RFC 7540, Section 5.3.4:
37
	//   Similarly, streams that are in the "idle" state can be assigned
38
	//   priority or become a parent of other streams. This allows for the
39
	//   creation of a grouping node in the dependency tree, which enables
40
	//   more flexible expressions of priority. Idle streams begin with a
41
	//   default priority (Section 5.3.5).
42
	MaxIdleNodesInTree int
43

44
	// ThrottleOutOfOrderWrites enables write throttling to help ensure that
45
	// data is delivered in priority order. This works around a race where
46
	// stream B depends on stream A and both streams are about to call Write
47
	// to queue DATA frames. If B wins the race, a naive scheduler would eagerly
48
	// write as much data from B as possible, but this is suboptimal because A
49
	// is a higher-priority stream. With throttling enabled, we write a small
50
	// amount of data from B to minimize the amount of bandwidth that B can
51
	// steal from A.
52
	ThrottleOutOfOrderWrites bool
53
}
54

55
// NewPriorityWriteScheduler constructs a WriteScheduler that schedules
56
// frames by following HTTP/2 priorities as described in RFC 7540 Section 5.3.
57
// If cfg is nil, default options are used.
58
func NewPriorityWriteScheduler(cfg *PriorityWriteSchedulerConfig) WriteScheduler {
59
	if cfg == nil {
60
		// For justification of these defaults, see:
61
		// https://docs.google.com/document/d/1oLhNg1skaWD4_DtaoCxdSRN5erEXrH-KnLrMwEpOtFY
62
		cfg = &PriorityWriteSchedulerConfig{
63
			MaxClosedNodesInTree:     10,
64
			MaxIdleNodesInTree:       10,
65
			ThrottleOutOfOrderWrites: false,
66
		}
67
	}
68

69
	ws := &priorityWriteScheduler{
70
		nodes:                make(map[uint32]*priorityNode),
71
		maxClosedNodesInTree: cfg.MaxClosedNodesInTree,
72
		maxIdleNodesInTree:   cfg.MaxIdleNodesInTree,
73
		enableWriteThrottle:  cfg.ThrottleOutOfOrderWrites,
74
	}
75
	ws.nodes[0] = &ws.root
76
	if cfg.ThrottleOutOfOrderWrites {
77
		ws.writeThrottleLimit = 1024
78
	} else {
79
		ws.writeThrottleLimit = math.MaxInt32
80
	}
81
	return ws
82
}
83

84
type priorityNodeState int
85

86
const (
87
	priorityNodeOpen priorityNodeState = iota
88
	priorityNodeClosed
89
	priorityNodeIdle
90
)
91

92
// priorityNode is a node in an HTTP/2 priority tree.
93
// Each node is associated with a single stream ID.
94
// See RFC 7540, Section 5.3.
95
type priorityNode struct {
96
	q            writeQueue        // queue of pending frames to write
97
	id           uint32            // id of the stream, or 0 for the root of the tree
98
	weight       uint8             // the actual weight is weight+1, so the value is in [1,256]
99
	state        priorityNodeState // open | closed | idle
100
	bytes        int64             // number of bytes written by this node, or 0 if closed
101
	subtreeBytes int64             // sum(node.bytes) of all nodes in this subtree
102

103
	// These links form the priority tree.
104
	parent     *priorityNode
105
	kids       *priorityNode // start of the kids list
106
	prev, next *priorityNode // doubly-linked list of siblings
107
}
108

109
func (n *priorityNode) setParent(parent *priorityNode) {
110
	if n == parent {
111
		panic("setParent to self")
112
	}
113
	if n.parent == parent {
114
		return
115
	}
116
	// Unlink from current parent.
117
	if parent := n.parent; parent != nil {
118
		if n.prev == nil {
119
			parent.kids = n.next
120
		} else {
121
			n.prev.next = n.next
122
		}
123
		if n.next != nil {
124
			n.next.prev = n.prev
125
		}
126
	}
127
	// Link to new parent.
128
	// If parent=nil, remove n from the tree.
129
	// Always insert at the head of parent.kids (this is assumed by walkReadyInOrder).
130
	n.parent = parent
131
	if parent == nil {
132
		n.next = nil
133
		n.prev = nil
134
	} else {
135
		n.next = parent.kids
136
		n.prev = nil
137
		if n.next != nil {
138
			n.next.prev = n
139
		}
140
		parent.kids = n
141
	}
142
}
143

144
func (n *priorityNode) addBytes(b int64) {
145
	n.bytes += b
146
	for ; n != nil; n = n.parent {
147
		n.subtreeBytes += b
148
	}
149
}
150

151
// walkReadyInOrder iterates over the tree in priority order, calling f for each node
152
// with a non-empty write queue. When f returns true, this function returns true and the
153
// walk halts. tmp is used as scratch space for sorting.
154
//
155
// f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true
156
// if any ancestor p of n is still open (ignoring the root node).
157
func (n *priorityNode) walkReadyInOrder(openParent bool, tmp *[]*priorityNode, f func(*priorityNode, bool) bool) bool {
158
	if !n.q.empty() && f(n, openParent) {
159
		return true
160
	}
161
	if n.kids == nil {
162
		return false
163
	}
164

165
	// Don't consider the root "open" when updating openParent since
166
	// we can't send data frames on the root stream (only control frames).
167
	if n.id != 0 {
168
		openParent = openParent || (n.state == priorityNodeOpen)
169
	}
170

171
	// Common case: only one kid or all kids have the same weight.
172
	// Some clients don't use weights; other clients (like web browsers)
173
	// use mostly-linear priority trees.
174
	w := n.kids.weight
175
	needSort := false
176
	for k := n.kids.next; k != nil; k = k.next {
177
		if k.weight != w {
178
			needSort = true
179
			break
180
		}
181
	}
182
	if !needSort {
183
		for k := n.kids; k != nil; k = k.next {
184
			if k.walkReadyInOrder(openParent, tmp, f) {
185
				return true
186
			}
187
		}
188
		return false
189
	}
190

191
	// Uncommon case: sort the child nodes. We remove the kids from the parent,
192
	// then re-insert after sorting so we can reuse tmp for future sort calls.
193
	*tmp = (*tmp)[:0]
194
	for n.kids != nil {
195
		*tmp = append(*tmp, n.kids)
196
		n.kids.setParent(nil)
197
	}
198
	sort.Sort(sortPriorityNodeSiblings(*tmp))
199
	for i := len(*tmp) - 1; i >= 0; i-- {
200
		(*tmp)[i].setParent(n) // setParent inserts at the head of n.kids
201
	}
202
	for k := n.kids; k != nil; k = k.next {
203
		if k.walkReadyInOrder(openParent, tmp, f) {
204
			return true
205
		}
206
	}
207
	return false
208
}
209

210
type sortPriorityNodeSiblings []*priorityNode
211

212
func (z sortPriorityNodeSiblings) Len() int      { return len(z) }
213
func (z sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] }
214
func (z sortPriorityNodeSiblings) Less(i, k int) bool {
215
	// Prefer the subtree that has sent fewer bytes relative to its weight.
216
	// See sections 5.3.2 and 5.3.4.
217
	wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes)
218
	wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes)
219
	if bi == 0 && bk == 0 {
220
		return wi >= wk
221
	}
222
	if bk == 0 {
223
		return false
224
	}
225
	return bi/bk <= wi/wk
226
}
227

228
type priorityWriteScheduler struct {
229
	// root is the root of the priority tree, where root.id = 0.
230
	// The root queues control frames that are not associated with any stream.
231
	root priorityNode
232

233
	// nodes maps stream ids to priority tree nodes.
234
	nodes map[uint32]*priorityNode
235

236
	// maxID is the maximum stream id in nodes.
237
	maxID uint32
238

239
	// lists of nodes that have been closed or are idle, but are kept in
240
	// the tree for improved prioritization. When the lengths exceed either
241
	// maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded.
242
	closedNodes, idleNodes []*priorityNode
243

244
	// From the config.
245
	maxClosedNodesInTree int
246
	maxIdleNodesInTree   int
247
	writeThrottleLimit   int32
248
	enableWriteThrottle  bool
249

250
	// tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations.
251
	tmp []*priorityNode
252

253
	// pool of empty queues for reuse.
254
	queuePool writeQueuePool
255
}
256

257
func (ws *priorityWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
258
	// The stream may be currently idle but cannot be opened or closed.
259
	if curr := ws.nodes[streamID]; curr != nil {
260
		if curr.state != priorityNodeIdle {
261
			panic(fmt.Sprintf("stream %d already opened", streamID))
262
		}
263
		curr.state = priorityNodeOpen
264
		return
265
	}
266

267
	// RFC 7540, Section 5.3.5:
268
	//  "All streams are initially assigned a non-exclusive dependency on stream 0x0.
269
	//  Pushed streams initially depend on their associated stream. In both cases,
270
	//  streams are assigned a default weight of 16."
271
	parent := ws.nodes[options.PusherID]
272
	if parent == nil {
273
		parent = &ws.root
274
	}
275
	n := &priorityNode{
276
		q:      *ws.queuePool.get(),
277
		id:     streamID,
278
		weight: priorityDefaultWeight,
279
		state:  priorityNodeOpen,
280
	}
281
	n.setParent(parent)
282
	ws.nodes[streamID] = n
283
	if streamID > ws.maxID {
284
		ws.maxID = streamID
285
	}
286
}
287

288
func (ws *priorityWriteScheduler) CloseStream(streamID uint32) {
289
	if streamID == 0 {
290
		panic("violation of WriteScheduler interface: cannot close stream 0")
291
	}
292
	if ws.nodes[streamID] == nil {
293
		panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID))
294
	}
295
	if ws.nodes[streamID].state != priorityNodeOpen {
296
		panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID))
297
	}
298

299
	n := ws.nodes[streamID]
300
	n.state = priorityNodeClosed
301
	n.addBytes(-n.bytes)
302

303
	q := n.q
304
	ws.queuePool.put(&q)
305
	n.q.s = nil
306
	if ws.maxClosedNodesInTree > 0 {
307
		ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n)
308
	} else {
309
		ws.removeNode(n)
310
	}
311
}
312

313
func (ws *priorityWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {
314
	if streamID == 0 {
315
		panic("adjustPriority on root")
316
	}
317

318
	// If streamID does not exist, there are two cases:
319
	// - A closed stream that has been removed (this will have ID <= maxID)
320
	// - An idle stream that is being used for "grouping" (this will have ID > maxID)
321
	n := ws.nodes[streamID]
322
	if n == nil {
323
		if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 {
324
			return
325
		}
326
		ws.maxID = streamID
327
		n = &priorityNode{
328
			q:      *ws.queuePool.get(),
329
			id:     streamID,
330
			weight: priorityDefaultWeight,
331
			state:  priorityNodeIdle,
332
		}
333
		n.setParent(&ws.root)
334
		ws.nodes[streamID] = n
335
		ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n)
336
	}
337

338
	// Section 5.3.1: A dependency on a stream that is not currently in the tree
339
	// results in that stream being given a default priority (Section 5.3.5).
340
	parent := ws.nodes[priority.StreamDep]
341
	if parent == nil {
342
		n.setParent(&ws.root)
343
		n.weight = priorityDefaultWeight
344
		return
345
	}
346

347
	// Ignore if the client tries to make a node its own parent.
348
	if n == parent {
349
		return
350
	}
351

352
	// Section 5.3.3:
353
	//   "If a stream is made dependent on one of its own dependencies, the
354
	//   formerly dependent stream is first moved to be dependent on the
355
	//   reprioritized stream's previous parent. The moved dependency retains
356
	//   its weight."
357
	//
358
	// That is: if parent depends on n, move parent to depend on n.parent.
359
	for x := parent.parent; x != nil; x = x.parent {
360
		if x == n {
361
			parent.setParent(n.parent)
362
			break
363
		}
364
	}
365

366
	// Section 5.3.3: The exclusive flag causes the stream to become the sole
367
	// dependency of its parent stream, causing other dependencies to become
368
	// dependent on the exclusive stream.
369
	if priority.Exclusive {
370
		k := parent.kids
371
		for k != nil {
372
			next := k.next
373
			if k != n {
374
				k.setParent(n)
375
			}
376
			k = next
377
		}
378
	}
379

380
	n.setParent(parent)
381
	n.weight = priority.Weight
382
}
383

384
func (ws *priorityWriteScheduler) Push(wr FrameWriteRequest) {
385
	var n *priorityNode
386
	if wr.isControl() {
387
		n = &ws.root
388
	} else {
389
		id := wr.StreamID()
390
		n = ws.nodes[id]
391
		if n == nil {
392
			// id is an idle or closed stream. wr should not be a HEADERS or
393
			// DATA frame. In other case, we push wr onto the root, rather
394
			// than creating a new priorityNode.
395
			if wr.DataSize() > 0 {
396
				panic("add DATA on non-open stream")
397
			}
398
			n = &ws.root
399
		}
400
	}
401
	n.q.push(wr)
402
}
403

404
func (ws *priorityWriteScheduler) Pop() (wr FrameWriteRequest, ok bool) {
405
	ws.root.walkReadyInOrder(false, &ws.tmp, func(n *priorityNode, openParent bool) bool {
406
		limit := int32(math.MaxInt32)
407
		if openParent {
408
			limit = ws.writeThrottleLimit
409
		}
410
		wr, ok = n.q.consume(limit)
411
		if !ok {
412
			return false
413
		}
414
		n.addBytes(int64(wr.DataSize()))
415
		// If B depends on A and B continuously has data available but A
416
		// does not, gradually increase the throttling limit to allow B to
417
		// steal more and more bandwidth from A.
418
		if openParent {
419
			ws.writeThrottleLimit += 1024
420
			if ws.writeThrottleLimit < 0 {
421
				ws.writeThrottleLimit = math.MaxInt32
422
			}
423
		} else if ws.enableWriteThrottle {
424
			ws.writeThrottleLimit = 1024
425
		}
426
		return true
427
	})
428
	return wr, ok
429
}
430

431
func (ws *priorityWriteScheduler) addClosedOrIdleNode(list *[]*priorityNode, maxSize int, n *priorityNode) {
432
	if maxSize == 0 {
433
		return
434
	}
435
	if len(*list) == maxSize {
436
		// Remove the oldest node, then shift left.
437
		ws.removeNode((*list)[0])
438
		x := (*list)[1:]
439
		copy(*list, x)
440
		*list = (*list)[:len(x)]
441
	}
442
	*list = append(*list, n)
443
}
444

445
func (ws *priorityWriteScheduler) removeNode(n *priorityNode) {
446
	for k := n.kids; k != nil; k = k.next {
447
		k.setParent(n.parent)
448
	}
449
	n.setParent(nil)
450
	delete(ws.nodes, n.id)
451
}
452

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.