1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
22
"golang.org/x/time/rate"
24
"github.com/cubefs/cubefs/blobstore/common/trace"
25
"github.com/cubefs/cubefs/blobstore/util/limit"
26
"github.com/cubefs/cubefs/blobstore/util/limit/count"
30
_tagLimitedR = "limitedr"
31
_tagLimitedW = "limitedw"
34
// Limiter rps and bps limiter
35
type Limiter interface {
36
// Acquire acquire with one request per second
37
Acquire(name string) error
38
// Release release of one request per second
41
// Reader return io.Reader with bandwidth rate limit
42
Reader(ctx context.Context, r io.Reader) io.Reader
43
// Writer return io.Writer with bandwidth rate limit
44
Writer(ctx context.Context, w io.Writer) io.Writer
46
// Status returns running status
47
// TODO: calculate rate limit wait concurrent
51
// LimitConfig configuration of limiter
52
type LimitConfig struct {
53
NameRps map[string]int `json:"name_rps"` // request with name n/s
54
ReaderMBps int `json:"reader_mbps"` // read with MB/s
55
WriterMBps int `json:"writer_mbps"` // write with MB/s
58
// Status running status
60
Config LimitConfig `json:"config"` // configuration status
61
Running map[string]int `json:"running"` // running request
62
ReadWait int `json:"read_wait"` // wait reading duration
63
WriteWait int `json:"write_wait"` // wait writing duration
66
// Reader limited reader
73
var _ io.Reader = &Reader{}
75
func (r *Reader) Read(p []byte) (n int, err error) {
76
n, err = r.underlying.Read(p)
79
reserve := r.rate.ReserveN(now, n)
82
delay := reserve.DelayFrom(now)
87
span := trace.SpanFromContextSafe(r.ctx)
89
span.Warnf("reader exceeds limiter n:%d, burst:%d", n, r.rate.Burst())
92
t := time.NewTimer(delay)
95
// for access PUT request is Read from client
96
span.SetTag(_tagLimitedW, delay.Milliseconds())
103
// Context was canceled before we could proceed. Cancel the
104
// reservation, which may permit other events to proceed sooner.
111
// Writer limited writer
118
var _ io.Writer = &Writer{}
120
func (w *Writer) Write(p []byte) (n int, err error) {
121
n, err = w.underlying.Write(p)
124
reserve := w.rate.ReserveN(now, n)
127
delay := reserve.DelayFrom(now)
132
span := trace.SpanFromContextSafe(w.ctx)
134
span.Warnf("writer exceeds limiter n:%d, burst:%d", n, w.rate.Burst())
137
t := time.NewTimer(delay)
140
// for access GET request is Write to client
141
span.SetTag(_tagLimitedR, delay.Milliseconds())
148
// Context was canceled before we could proceed. Cancel the
149
// reservation, which may permit other events to proceed sooner.
158
limiters map[string]limit.Limiter
159
rateReader *rate.Limiter
160
rateWriter *rate.Limiter
163
// NewLimiter returns a Limiter
164
func NewLimiter(cfg LimitConfig) Limiter {
168
limiters: make(map[string]limit.Limiter, len(cfg.NameRps)),
171
for name, rps := range cfg.NameRps {
173
lim.limiters[name] = count.New(rps)
177
if cfg.ReaderMBps > 0 {
178
lim.rateReader = rate.NewLimiter(rate.Limit(cfg.ReaderMBps*mb), 2*cfg.ReaderMBps*mb)
180
if cfg.WriterMBps > 0 {
181
lim.rateWriter = rate.NewLimiter(rate.Limit(cfg.WriterMBps*mb), 2*cfg.WriterMBps*mb)
187
func (lim *limiter) Acquire(name string) error {
188
if l := lim.limiters[name]; l != nil {
194
func (lim *limiter) Release(name string) {
195
if l := lim.limiters[name]; l != nil {
200
func (lim *limiter) Reader(ctx context.Context, r io.Reader) io.Reader {
201
if lim.rateReader != nil {
204
rate: lim.rateReader,
211
func (lim *limiter) Writer(ctx context.Context, w io.Writer) io.Writer {
212
if lim.rateWriter != nil {
215
rate: lim.rateWriter,
222
func (lim *limiter) Status() Status {
227
st.Running = make(map[string]int, len(lim.limiters))
228
for name, nl := range lim.limiters {
229
st.Running[name] = nl.Running()
232
st.ReadWait = rateWait(lim.rateReader)
233
st.WriteWait = rateWait(lim.rateWriter)
238
// rateWait get duration of waiting half of limit
239
func rateWait(r *rate.Limiter) int {
244
reserve := r.ReserveN(now, int(r.Limit())/2)
245
duration := reserve.DelayFrom(now)
247
return int(duration.Milliseconds())