cubefs
1// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http26
7import (8"errors"9"fmt"10"sync"11)
12
13// Buffer chunks are allocated from a pool to reduce pressure on GC.
14// The maximum wasted space per dataBuffer is 2x the largest size class,
15// which happens when the dataBuffer has multiple chunks and there is
16// one unread byte in both the first and last chunks. We use a few size
17// classes to minimize overheads for servers that typically receive very
18// small request bodies.
19//
20// TODO: Benchmark to determine if the pools are necessary. The GC may have
21// improved enough that we can instead allocate chunks like this:
22// make([]byte, max(16<<10, expectedBytesRemaining))
23var (24dataChunkSizeClasses = []int{251 << 10,262 << 10,274 << 10,288 << 10,2916 << 10,30}31dataChunkPools = [...]sync.Pool{32{New: func() interface{} { return make([]byte, 1<<10) }},33{New: func() interface{} { return make([]byte, 2<<10) }},34{New: func() interface{} { return make([]byte, 4<<10) }},35{New: func() interface{} { return make([]byte, 8<<10) }},36{New: func() interface{} { return make([]byte, 16<<10) }},37}38)
39
40func getDataBufferChunk(size int64) []byte {41i := 042for ; i < len(dataChunkSizeClasses)-1; i++ {43if size <= int64(dataChunkSizeClasses[i]) {44break45}46}47return dataChunkPools[i].Get().([]byte)48}
49
50func putDataBufferChunk(p []byte) {51for i, n := range dataChunkSizeClasses {52if len(p) == n {53dataChunkPools[i].Put(p)54return55}56}57panic(fmt.Sprintf("unexpected buffer len=%v", len(p)))58}
59
60// dataBuffer is an io.ReadWriter backed by a list of data chunks.
61// Each dataBuffer is used to read DATA frames on a single stream.
62// The buffer is divided into chunks so the server can limit the
63// total memory used by a single connection without limiting the
64// request body size on any single stream.
65type dataBuffer struct {66chunks [][]byte67r int // next byte to read is chunks[0][r]68w int // next byte to write is chunks[len(chunks)-1][w]69size int // total buffered bytes70expected int64 // we expect at least this many bytes in future Write calls (ignored if <= 0)71}
72
73var errReadEmpty = errors.New("read from empty dataBuffer")74
75// Read copies bytes from the buffer into p.
76// It is an error to read when no data is available.
77func (b *dataBuffer) Read(p []byte) (int, error) {78if b.size == 0 {79return 0, errReadEmpty80}81var ntotal int82for len(p) > 0 && b.size > 0 {83readFrom := b.bytesFromFirstChunk()84n := copy(p, readFrom)85p = p[n:]86ntotal += n87b.r += n88b.size -= n89// If the first chunk has been consumed, advance to the next chunk.90if b.r == len(b.chunks[0]) {91putDataBufferChunk(b.chunks[0])92end := len(b.chunks) - 193copy(b.chunks[:end], b.chunks[1:])94b.chunks[end] = nil95b.chunks = b.chunks[:end]96b.r = 097}98}99return ntotal, nil100}
101
102func (b *dataBuffer) bytesFromFirstChunk() []byte {103if len(b.chunks) == 1 {104return b.chunks[0][b.r:b.w]105}106return b.chunks[0][b.r:]107}
108
109// Len returns the number of bytes of the unread portion of the buffer.
110func (b *dataBuffer) Len() int {111return b.size112}
113
114// Write appends p to the buffer.
115func (b *dataBuffer) Write(p []byte) (int, error) {116ntotal := len(p)117for len(p) > 0 {118// If the last chunk is empty, allocate a new chunk. Try to allocate119// enough to fully copy p plus any additional bytes we expect to120// receive. However, this may allocate less than len(p).121want := int64(len(p))122if b.expected > want {123want = b.expected124}125chunk := b.lastChunkOrAlloc(want)126n := copy(chunk[b.w:], p)127p = p[n:]128b.w += n129b.size += n130b.expected -= int64(n)131}132return ntotal, nil133}
134
135func (b *dataBuffer) lastChunkOrAlloc(want int64) []byte {136if len(b.chunks) != 0 {137last := b.chunks[len(b.chunks)-1]138if b.w < len(last) {139return last140}141}142chunk := getDataBufferChunk(want)143b.chunks = append(b.chunks, chunk)144b.w = 0145return chunk146}
147