cubefs

Форк
0
/
limiter.go 
248 строк · 5.6 Кб
1
// Copyright 2022 The CubeFS Authors.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12
// implied. See the License for the specific language governing
13
// permissions and limitations under the License.
14

15
package access
16

17
import (
18
	"context"
19
	"io"
20
	"time"
21

22
	"golang.org/x/time/rate"
23

24
	"github.com/cubefs/cubefs/blobstore/common/trace"
25
	"github.com/cubefs/cubefs/blobstore/util/limit"
26
	"github.com/cubefs/cubefs/blobstore/util/limit/count"
27
)
28

29
const (
30
	_tagLimitedR = "limitedr"
31
	_tagLimitedW = "limitedw"
32
)
33

34
// Limiter rps and bps limiter
35
type Limiter interface {
36
	// Acquire acquire with one request per second
37
	Acquire(name string) error
38
	// Release release of one request per second
39
	Release(name string)
40

41
	// Reader return io.Reader with bandwidth rate limit
42
	Reader(ctx context.Context, r io.Reader) io.Reader
43
	// Writer return io.Writer with bandwidth rate limit
44
	Writer(ctx context.Context, w io.Writer) io.Writer
45

46
	// Status returns running status
47
	// TODO: calculate rate limit wait concurrent
48
	Status() Status
49
}
50

51
// LimitConfig configuration of limiter
52
type LimitConfig struct {
53
	NameRps    map[string]int `json:"name_rps"`    // request with name n/s
54
	ReaderMBps int            `json:"reader_mbps"` // read with MB/s
55
	WriterMBps int            `json:"writer_mbps"` // write with MB/s
56
}
57

58
// Status running status
59
type Status struct {
60
	Config    LimitConfig    `json:"config"`     // configuration status
61
	Running   map[string]int `json:"running"`    // running request
62
	ReadWait  int            `json:"read_wait"`  // wait reading duration
63
	WriteWait int            `json:"write_wait"` // wait writing duration
64
}
65

66
// Reader limited reader
67
type Reader struct {
68
	ctx        context.Context
69
	rate       *rate.Limiter
70
	underlying io.Reader
71
}
72

73
var _ io.Reader = &Reader{}
74

75
func (r *Reader) Read(p []byte) (n int, err error) {
76
	n, err = r.underlying.Read(p)
77

78
	now := time.Now()
79
	reserve := r.rate.ReserveN(now, n)
80

81
	// Wait if necessary
82
	delay := reserve.DelayFrom(now)
83
	if delay == 0 {
84
		return
85
	}
86

87
	span := trace.SpanFromContextSafe(r.ctx)
88
	if !reserve.OK() {
89
		span.Warnf("reader exceeds limiter n:%d, burst:%d", n, r.rate.Burst())
90
		return
91
	}
92
	t := time.NewTimer(delay)
93
	defer t.Stop()
94

95
	// for access PUT request is Read from client
96
	span.SetTag(_tagLimitedW, delay.Milliseconds())
97

98
	select {
99
	case <-t.C:
100
		// We can proceed.
101
		return
102
	case <-r.ctx.Done():
103
		// Context was canceled before we could proceed.  Cancel the
104
		// reservation, which may permit other events to proceed sooner.
105
		reserve.Cancel()
106
		err = r.ctx.Err()
107
		return
108
	}
109
}
110

111
// Writer limited writer
112
type Writer struct {
113
	ctx        context.Context
114
	rate       *rate.Limiter
115
	underlying io.Writer
116
}
117

118
var _ io.Writer = &Writer{}
119

120
func (w *Writer) Write(p []byte) (n int, err error) {
121
	n, err = w.underlying.Write(p)
122

123
	now := time.Now()
124
	reserve := w.rate.ReserveN(now, n)
125

126
	// Wait if necessary
127
	delay := reserve.DelayFrom(now)
128
	if delay == 0 {
129
		return
130
	}
131

132
	span := trace.SpanFromContextSafe(w.ctx)
133
	if !reserve.OK() {
134
		span.Warnf("writer exceeds limiter n:%d, burst:%d", n, w.rate.Burst())
135
		return
136
	}
137
	t := time.NewTimer(delay)
138
	defer t.Stop()
139

140
	// for access GET request is Write to client
141
	span.SetTag(_tagLimitedR, delay.Milliseconds())
142

143
	select {
144
	case <-t.C:
145
		// We can proceed.
146
		return
147
	case <-w.ctx.Done():
148
		// Context was canceled before we could proceed.  Cancel the
149
		// reservation, which may permit other events to proceed sooner.
150
		reserve.Cancel()
151
		err = w.ctx.Err()
152
		return
153
	}
154
}
155

156
type limiter struct {
157
	config     LimitConfig
158
	limiters   map[string]limit.Limiter
159
	rateReader *rate.Limiter
160
	rateWriter *rate.Limiter
161
}
162

163
// NewLimiter returns a Limiter
164
func NewLimiter(cfg LimitConfig) Limiter {
165
	mb := 1 << 20
166
	lim := &limiter{
167
		config:   cfg,
168
		limiters: make(map[string]limit.Limiter, len(cfg.NameRps)),
169
	}
170

171
	for name, rps := range cfg.NameRps {
172
		if rps > 0 {
173
			lim.limiters[name] = count.New(rps)
174
		}
175
	}
176

177
	if cfg.ReaderMBps > 0 {
178
		lim.rateReader = rate.NewLimiter(rate.Limit(cfg.ReaderMBps*mb), 2*cfg.ReaderMBps*mb)
179
	}
180
	if cfg.WriterMBps > 0 {
181
		lim.rateWriter = rate.NewLimiter(rate.Limit(cfg.WriterMBps*mb), 2*cfg.WriterMBps*mb)
182
	}
183

184
	return lim
185
}
186

187
func (lim *limiter) Acquire(name string) error {
188
	if l := lim.limiters[name]; l != nil {
189
		return l.Acquire()
190
	}
191
	return nil
192
}
193

194
func (lim *limiter) Release(name string) {
195
	if l := lim.limiters[name]; l != nil {
196
		l.Release()
197
	}
198
}
199

200
func (lim *limiter) Reader(ctx context.Context, r io.Reader) io.Reader {
201
	if lim.rateReader != nil {
202
		return &Reader{
203
			ctx:        ctx,
204
			rate:       lim.rateReader,
205
			underlying: r,
206
		}
207
	}
208
	return r
209
}
210

211
func (lim *limiter) Writer(ctx context.Context, w io.Writer) io.Writer {
212
	if lim.rateWriter != nil {
213
		return &Writer{
214
			ctx:        ctx,
215
			rate:       lim.rateWriter,
216
			underlying: w,
217
		}
218
	}
219
	return w
220
}
221

222
func (lim *limiter) Status() Status {
223
	st := Status{
224
		Config: lim.config,
225
	}
226

227
	st.Running = make(map[string]int, len(lim.limiters))
228
	for name, nl := range lim.limiters {
229
		st.Running[name] = nl.Running()
230
	}
231

232
	st.ReadWait = rateWait(lim.rateReader)
233
	st.WriteWait = rateWait(lim.rateWriter)
234

235
	return st
236
}
237

238
// rateWait get duration of waiting half of limit
239
func rateWait(r *rate.Limiter) int {
240
	if r == nil {
241
		return 0
242
	}
243
	now := time.Now()
244
	reserve := r.ReserveN(now, int(r.Limit())/2)
245
	duration := reserve.DelayFrom(now)
246
	reserve.Cancel()
247
	return int(duration.Milliseconds())
248
}
249

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.