podman
1// Copyright 2023 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http26
7import (8"fmt"9"math"10)
11
12type roundRobinWriteScheduler struct {13// control contains control frames (SETTINGS, PING, etc.).14control writeQueue
15
16// streams maps stream ID to a queue.17streams map[uint32]*writeQueue18
19// stream queues are stored in a circular linked list.20// head is the next stream to write, or nil if there are no streams open.21head *writeQueue22
23// pool of empty queues for reuse.24queuePool writeQueuePool
25}
26
27// newRoundRobinWriteScheduler constructs a new write scheduler.
28// The round robin scheduler priorizes control frames
29// like SETTINGS and PING over DATA frames.
30// When there are no control frames to send, it performs a round-robin
31// selection from the ready streams.
32func newRoundRobinWriteScheduler() WriteScheduler {33ws := &roundRobinWriteScheduler{34streams: make(map[uint32]*writeQueue),35}36return ws37}
38
39func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {40if ws.streams[streamID] != nil {41panic(fmt.Errorf("stream %d already opened", streamID))42}43q := ws.queuePool.get()44ws.streams[streamID] = q45if ws.head == nil {46ws.head = q47q.next = q48q.prev = q49} else {50// Queues are stored in a ring.51// Insert the new stream before ws.head, putting it at the end of the list.52q.prev = ws.head.prev53q.next = ws.head54q.prev.next = q55q.next.prev = q56}57}
58
59func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {60q := ws.streams[streamID]61if q == nil {62return63}64if q.next == q {65// This was the only open stream.66ws.head = nil67} else {68q.prev.next = q.next69q.next.prev = q.prev70if ws.head == q {71ws.head = q.next72}73}74delete(ws.streams, streamID)75ws.queuePool.put(q)76}
77
78func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}79
80func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {81if wr.isControl() {82ws.control.push(wr)83return84}85q := ws.streams[wr.StreamID()]86if q == nil {87// This is a closed stream.88// wr should not be a HEADERS or DATA frame.89// We push the request onto the control queue.90if wr.DataSize() > 0 {91panic("add DATA on non-open stream")92}93ws.control.push(wr)94return95}96q.push(wr)97}
98
99func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {100// Control and RST_STREAM frames first.101if !ws.control.empty() {102return ws.control.shift(), true103}104if ws.head == nil {105return FrameWriteRequest{}, false106}107q := ws.head108for {109if wr, ok := q.consume(math.MaxInt32); ok {110ws.head = q.next111return wr, true112}113q = q.next114if q == ws.head {115break116}117}118return FrameWriteRequest{}, false119}
120