1
// Copyright 2022 The CubeFS Authors.
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
7
// http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
24
"github.com/stretchr/testify/require"
25
"golang.org/x/time/rate"
27
"github.com/cubefs/cubefs/blobstore/common/trace"
28
"github.com/cubefs/cubefs/blobstore/util/limit"
39
func (r *limitReader) Read(p []byte) (n int, err error) {
44
b := make([]byte, 1<<20)
50
func (w *limitWriter) Write(p []byte) (n int, err error) {
51
b := make([]byte, 1<<20)
61
func TestAccessLimitReader(t *testing.T) {
62
ctx, cancel := context.WithCancel(ctxWithName("TestAccessLimitReader")())
65
rate: rate.NewLimiter(rate.Limit(1<<20), 1<<20),
66
underlying: &limitReader{size: 1 << 24},
68
errCh := make(chan error)
70
buf := make([]byte, 1<<24)
71
_, err := io.ReadFull(r, buf)
75
time.Sleep(100 * time.Millisecond)
82
func TestAccessLimitWriter(t *testing.T) {
83
ctx, cancel := context.WithCancel(ctxWithName("TestAccessLimitWriter")())
86
rate: rate.NewLimiter(rate.Limit(1<<20), 1<<20),
87
underlying: &limitWriter{},
89
errCh := make(chan error)
91
w.Write(make([]byte, 1<<20))
92
_, err := w.Write(make([]byte, 1<<20))
96
time.Sleep(100 * time.Millisecond)
100
require.Error(t, err)
103
func TestAccessLimitExceedBurst(t *testing.T) {
104
ctx := ctxWithName("TestAccessLimitExceedBurst")()
109
rate: rate.NewLimiter(rate.Limit(1<<20), 1<<21),
110
underlying: &limitReader{size: 1 << 24},
112
_, err := r.Read(make([]byte, 1<<22))
113
require.NoError(t, err)
118
rate: rate.NewLimiter(rate.Limit(1<<20), 1<<20),
119
underlying: &limitWriter{},
121
_, err := w.Write(make([]byte, 1<<22))
122
require.NoError(t, err)
124
require.Greater(t, int64(200), time.Since(start).Milliseconds())
127
func TestAccessLimiterBase(t *testing.T) {
131
NameRps: map[string]int{
141
for range [100]struct{}{} {
143
require.NoError(t, err)
148
err := l.Acquire(nameGet)
149
require.NoError(t, err)
150
err = l.Acquire(nameGet)
151
require.Equal(t, limit.ErrLimited, err)
153
err = l.Acquire(nameGet)
154
require.NoError(t, err)
158
err := l.Acquire(nameGet)
159
require.NoError(t, err)
160
err = l.Acquire(namePut)
161
require.NoError(t, err)
163
err = l.Acquire(nameGet)
164
require.Equal(t, limit.ErrLimited, err)
165
err = l.Acquire(namePut)
166
require.Equal(t, limit.ErrLimited, err)
173
ctx := ctxWithName("TestAccessLimiterBase")()
174
rbuff := &limitReader{size: 1 << 8}
175
r := l.Reader(ctx, rbuff)
177
wbuff := &limitWriter{}
178
w := l.Writer(ctx, wbuff)
179
buf := make([]byte, 1<<20)
180
var wg sync.WaitGroup
183
io.CopyBuffer(w, r, buf)
188
span := trace.SpanFromContextSafe(ctx)
190
require.Empty(t, tags, span.TraceID(), tags)
193
ctx, cancel := context.WithCancel(ctxWithName("TestAccessLimiterBase")())
194
rbuff := &limitReader{size: 1 << 30}
195
r := l.Reader(ctx, rbuff)
197
wbuff := &limitWriter{}
198
w := l.Writer(ctx, wbuff)
200
var wg sync.WaitGroup
202
closeCh := make(chan struct{})
213
_, err := io.CopyN(w, r, 1<<20)
220
time.Sleep(time.Second)
225
span := trace.SpanFromContextSafe(ctx)
227
require.NotEmpty(t, tags, span.TraceID(), tags)
231
func TestAccessLimiterNoop(t *testing.T) {
232
l := NewLimiter(LimitConfig{
239
err := l.Acquire(name)
240
require.NoError(t, err)
241
err = l.Acquire(name)
242
require.NoError(t, err)
246
ctx := ctxWithName("TestAccessLimiterNoop")()
247
rbuff := &limitReader{size: 1 << 10}
248
r := l.Reader(ctx, rbuff)
250
wbuff := &limitWriter{}
251
w := l.Writer(ctx, wbuff)
252
buf := make([]byte, 1<<5)
253
ch := make(chan struct{})
255
io.CopyBuffer(w, r, buf)
260
span := trace.SpanFromContextSafe(ctx)
262
require.Empty(t, tags, span.TraceID(), tags)
265
func TestAccessLimiterStatus(t *testing.T) {
267
l := NewLimiter(LimitConfig{
272
for range [100]struct{}{} {
275
t.Logf("%+v\n", l.Status())
278
ctx := ctxWithName("TestAccessLimiterStatus")()
280
l := NewLimiter(LimitConfig{
281
NameRps: map[string]int{name: 10},
286
ch := make(chan struct{})
287
for range [7]struct{}{} {
295
var wg sync.WaitGroup
298
rbuff := &limitReader{size: 1 << 24}
299
r := l.Reader(ctx, rbuff)
300
io.CopyBuffer(&limitWriter{}, r, make([]byte, 1<<20))
304
w := l.Writer(ctx, &limitWriter{})
306
for range [8]struct{}{} {
315
w.Write(make([]byte, 1<<20))
320
time.Sleep(time.Second * 2)
321
t.Logf("%+v\n", l.Status())