cubefs

Форк
0
/
limiter_test.go 
326 строк · 6.3 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package access
16

17
import (
18
	"context"
19
	"io"
20
	"sync"
21
	"testing"
22
	"time"
23

24
	"github.com/stretchr/testify/require"
25
	"golang.org/x/time/rate"
26

27
	"github.com/cubefs/cubefs/blobstore/common/trace"
28
	"github.com/cubefs/cubefs/blobstore/util/limit"
29
)
30

31
type (
32
	limitReader struct {
33
		size int
34
		read int
35
	}
36
	limitWriter struct{}
37
)
38

39
func (r *limitReader) Read(p []byte) (n int, err error) {
40
	if r.read >= r.size {
41
		return 0, io.EOF
42
	}
43

44
	b := make([]byte, 1<<20)
45
	n = copy(p, b)
46
	r.read += n
47
	return
48
}
49

50
func (w *limitWriter) Write(p []byte) (n int, err error) {
51
	b := make([]byte, 1<<20)
52
	n = 0
53
	for len(p) > 0 {
54
		nn := copy(p, b)
55
		n += nn
56
		p = p[nn:]
57
	}
58
	return
59
}
60

61
func TestAccessLimitReader(t *testing.T) {
62
	ctx, cancel := context.WithCancel(ctxWithName("TestAccessLimitReader")())
63
	r := &Reader{
64
		ctx:        ctx,
65
		rate:       rate.NewLimiter(rate.Limit(1<<20), 1<<20),
66
		underlying: &limitReader{size: 1 << 24},
67
	}
68
	errCh := make(chan error)
69
	go func() {
70
		buf := make([]byte, 1<<24)
71
		_, err := io.ReadFull(r, buf)
72
		errCh <- err
73
	}()
74

75
	time.Sleep(100 * time.Millisecond)
76
	cancel()
77

78
	err := <-errCh
79
	require.Error(t, err)
80
}
81

82
func TestAccessLimitWriter(t *testing.T) {
83
	ctx, cancel := context.WithCancel(ctxWithName("TestAccessLimitWriter")())
84
	w := &Writer{
85
		ctx:        ctx,
86
		rate:       rate.NewLimiter(rate.Limit(1<<20), 1<<20),
87
		underlying: &limitWriter{},
88
	}
89
	errCh := make(chan error)
90
	go func() {
91
		w.Write(make([]byte, 1<<20))
92
		_, err := w.Write(make([]byte, 1<<20))
93
		errCh <- err
94
	}()
95

96
	time.Sleep(100 * time.Millisecond)
97
	cancel()
98

99
	err := <-errCh
100
	require.Error(t, err)
101
}
102

103
func TestAccessLimitExceedBurst(t *testing.T) {
104
	ctx := ctxWithName("TestAccessLimitExceedBurst")()
105
	start := time.Now()
106
	{
107
		r := &Reader{
108
			ctx:        ctx,
109
			rate:       rate.NewLimiter(rate.Limit(1<<20), 1<<21),
110
			underlying: &limitReader{size: 1 << 24},
111
		}
112
		_, err := r.Read(make([]byte, 1<<22))
113
		require.NoError(t, err)
114
	}
115
	{
116
		w := &Writer{
117
			ctx:        ctx,
118
			rate:       rate.NewLimiter(rate.Limit(1<<20), 1<<20),
119
			underlying: &limitWriter{},
120
		}
121
		_, err := w.Write(make([]byte, 1<<22))
122
		require.NoError(t, err)
123
	}
124
	require.Greater(t, int64(200), time.Since(start).Milliseconds())
125
}
126

127
func TestAccessLimiterBase(t *testing.T) {
128
	nameGet := "get"
129
	namePut := "put"
130
	cfg := LimitConfig{
131
		NameRps: map[string]int{
132
			nameGet: 1,
133
			namePut: 1,
134
		},
135
		ReaderMBps: 1,
136
		WriterMBps: 1,
137
	}
138
	l := NewLimiter(cfg)
139

140
	{
141
		for range [100]struct{}{} {
142
			err := l.Acquire("")
143
			require.NoError(t, err)
144
		}
145
		l.Release("")
146
	}
147
	{
148
		err := l.Acquire(nameGet)
149
		require.NoError(t, err)
150
		err = l.Acquire(nameGet)
151
		require.Equal(t, limit.ErrLimited, err)
152
		l.Release(nameGet)
153
		err = l.Acquire(nameGet)
154
		require.NoError(t, err)
155
		l.Release(nameGet)
156
	}
157
	{
158
		err := l.Acquire(nameGet)
159
		require.NoError(t, err)
160
		err = l.Acquire(namePut)
161
		require.NoError(t, err)
162

163
		err = l.Acquire(nameGet)
164
		require.Equal(t, limit.ErrLimited, err)
165
		err = l.Acquire(namePut)
166
		require.Equal(t, limit.ErrLimited, err)
167

168
		l.Release(nameGet)
169
		l.Release(namePut)
170
	}
171

172
	{
173
		ctx := ctxWithName("TestAccessLimiterBase")()
174
		rbuff := &limitReader{size: 1 << 8}
175
		r := l.Reader(ctx, rbuff)
176

177
		wbuff := &limitWriter{}
178
		w := l.Writer(ctx, wbuff)
179
		buf := make([]byte, 1<<20)
180
		var wg sync.WaitGroup
181
		wg.Add(1)
182
		go func() {
183
			io.CopyBuffer(w, r, buf)
184
			wg.Done()
185
		}()
186
		wg.Wait()
187

188
		span := trace.SpanFromContextSafe(ctx)
189
		tags := span.Tags()
190
		require.Empty(t, tags, span.TraceID(), tags)
191
	}
192
	{
193
		ctx, cancel := context.WithCancel(ctxWithName("TestAccessLimiterBase")())
194
		rbuff := &limitReader{size: 1 << 30}
195
		r := l.Reader(ctx, rbuff)
196

197
		wbuff := &limitWriter{}
198
		w := l.Writer(ctx, wbuff)
199

200
		var wg sync.WaitGroup
201
		wg.Add(1)
202
		closeCh := make(chan struct{})
203
		go func() {
204
			defer wg.Done()
205

206
			for {
207
				select {
208
				case <-closeCh:
209
					return
210
				default:
211
				}
212

213
				_, err := io.CopyN(w, r, 1<<20)
214
				if err != nil {
215
					return
216
				}
217
			}
218
		}()
219

220
		time.Sleep(time.Second)
221
		cancel()
222
		close(closeCh)
223
		wg.Wait()
224

225
		span := trace.SpanFromContextSafe(ctx)
226
		tags := span.Tags()
227
		require.NotEmpty(t, tags, span.TraceID(), tags)
228
	}
229
}
230

231
func TestAccessLimiterNoop(t *testing.T) {
232
	l := NewLimiter(LimitConfig{
233
		NameRps:    nil,
234
		ReaderMBps: 0,
235
		WriterMBps: 0,
236
	})
237

238
	name := "noop"
239
	err := l.Acquire(name)
240
	require.NoError(t, err)
241
	err = l.Acquire(name)
242
	require.NoError(t, err)
243
	l.Release(name)
244
	l.Release(name)
245

246
	ctx := ctxWithName("TestAccessLimiterNoop")()
247
	rbuff := &limitReader{size: 1 << 10}
248
	r := l.Reader(ctx, rbuff)
249

250
	wbuff := &limitWriter{}
251
	w := l.Writer(ctx, wbuff)
252
	buf := make([]byte, 1<<5)
253
	ch := make(chan struct{})
254
	go func() {
255
		io.CopyBuffer(w, r, buf)
256
		close(ch)
257
	}()
258

259
	<-ch
260
	span := trace.SpanFromContextSafe(ctx)
261
	tags := span.Tags()
262
	require.Empty(t, tags, span.TraceID(), tags)
263
}
264

265
func TestAccessLimiterStatus(t *testing.T) {
266
	{
267
		l := NewLimiter(LimitConfig{
268
			NameRps:    nil,
269
			ReaderMBps: 0,
270
			WriterMBps: 0,
271
		})
272
		for range [100]struct{}{} {
273
			l.Acquire("foo")
274
		}
275
		t.Logf("%+v\n", l.Status())
276
	}
277
	{
278
		ctx := ctxWithName("TestAccessLimiterStatus")()
279
		name := "foo"
280
		l := NewLimiter(LimitConfig{
281
			NameRps:    map[string]int{name: 10},
282
			ReaderMBps: 4,
283
			WriterMBps: 10,
284
		})
285

286
		ch := make(chan struct{})
287
		for range [7]struct{}{} {
288
			go func() {
289
				l.Acquire(name)
290
				<-ch
291
				l.Release(name)
292
			}()
293
		}
294

295
		var wg sync.WaitGroup
296
		wg.Add(1)
297
		go func() {
298
			rbuff := &limitReader{size: 1 << 24}
299
			r := l.Reader(ctx, rbuff)
300
			io.CopyBuffer(&limitWriter{}, r, make([]byte, 1<<20))
301
			wg.Done()
302
		}()
303

304
		w := l.Writer(ctx, &limitWriter{})
305
		wg.Add(8)
306
		for range [8]struct{}{} {
307
			go func() {
308
				defer wg.Done()
309
				for {
310
					select {
311
					case <-ch:
312
						return
313
					default:
314
					}
315
					w.Write(make([]byte, 1<<20))
316
				}
317
			}()
318
		}
319

320
		time.Sleep(time.Second * 2)
321
		t.Logf("%+v\n", l.Status())
322

323
		close(ch)
324
		wg.Wait()
325
	}
326
}
327

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.