Dragonfly2

Форк
0
/
traffic_shaper.go 
273 строки · 8.0 Кб
1
/*
2
 *     Copyright 2022 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package peer
18

19
import (
20
	"sync"
21
	"time"
22

23
	"go.uber.org/atomic"
24
	"golang.org/x/time/rate"
25

26
	logger "d7y.io/dragonfly/v2/internal/dflog"
27
	"d7y.io/dragonfly/v2/pkg/math"
28
)
29

30
const (
31
	TypePlainTrafficShaper    = "plain"
32
	TypeSamplingTrafficShaper = "sampling"
33
)
34

35
// TrafficShaper allocates bandwidth for running tasks dynamically
36
type TrafficShaper interface {
37
	// Start starts the TrafficShaper
38
	Start()
39
	// Stop stops the TrafficShaper
40
	Stop()
41
	// AddTask starts managing the new task
42
	AddTask(taskID string, ptc *peerTaskConductor)
43
	// RemoveTask removes completed task
44
	RemoveTask(taskID string)
45
	// Record records task's used bandwidth
46
	Record(taskID string, n int)
47
	// GetBandwidth gets the total download bandwidth in the past second
48
	GetBandwidth() int64
49
}
50

51
func NewTrafficShaper(trafficShaperType string, totalRateLimit rate.Limit, computePieceSize func(int64) uint32) TrafficShaper {
52
	var ts TrafficShaper
53
	switch trafficShaperType {
54
	case TypeSamplingTrafficShaper:
55
		ts = NewSamplingTrafficShaper(totalRateLimit, computePieceSize)
56
	case TypePlainTrafficShaper:
57
		ts = NewPlainTrafficShaper()
58
	default:
59
		logger.Warnf("type \"%s\" doesn't exist, use plain traffic shaper instead", trafficShaperType)
60
		ts = NewPlainTrafficShaper()
61
	}
62
	return ts
63
}
64

65
type plainTrafficShaper struct {
66
	// total used bandwidth in the past second
67
	lastSecondBandwidth *atomic.Int64
68
	// total used bandwidth in the current second
69
	usingBandWidth *atomic.Int64
70
	stopCh         chan struct{}
71
}
72

73
func NewPlainTrafficShaper() TrafficShaper {
74
	return &plainTrafficShaper{
75
		lastSecondBandwidth: atomic.NewInt64(0),
76
		usingBandWidth:      atomic.NewInt64(0),
77
		stopCh:              make(chan struct{}),
78
	}
79
}
80

81
func (ts *plainTrafficShaper) Start() {
82
	go func() {
83
		ticker := time.NewTicker(time.Second)
84
		defer ticker.Stop()
85
		for {
86
			select {
87
			case <-ticker.C:
88
				ts.lastSecondBandwidth.Store(ts.usingBandWidth.Load())
89
				ts.usingBandWidth.Store(0)
90
			case <-ts.stopCh:
91
				return
92
			}
93
		}
94
	}()
95
}
96

97
func (ts *plainTrafficShaper) Stop() {
98
	close(ts.stopCh)
99
}
100

101
func (ts *plainTrafficShaper) AddTask(_ string, _ *peerTaskConductor) {
102
}
103

104
func (ts *plainTrafficShaper) RemoveTask(_ string) {
105
}
106

107
func (ts *plainTrafficShaper) Record(_ string, n int) {
108
	ts.usingBandWidth.Add(int64(n))
109
}
110

111
func (ts *plainTrafficShaper) GetBandwidth() int64 {
112
	return ts.lastSecondBandwidth.Load()
113
}
114

115
type taskEntry struct {
116
	ptc       *peerTaskConductor
117
	pieceSize uint32
118
	// used bandwidth in the past second
119
	lastSecondBandwidth *atomic.Int64
120
	// need bandwidth in the next second
121
	needBandwidth int64
122
	// indicates if the bandwidth need to be updated, tasks added within one second don't need to be updated
123
	needUpdate bool
124
}
125

126
type samplingTrafficShaper struct {
127
	*logger.SugaredLoggerOnWith
128
	sync.RWMutex
129
	computePieceSize func(int64) uint32
130
	totalRateLimit   rate.Limit
131
	// total used bandwidth in the past second
132
	lastSecondBandwidth *atomic.Int64
133
	// total used bandwidth in the current second
134
	usingBandWidth *atomic.Int64
135
	tasks          map[string]*taskEntry
136
	stopCh         chan struct{}
137
}
138

139
func NewSamplingTrafficShaper(totalRateLimit rate.Limit, computePieceSize func(int64) uint32) TrafficShaper {
140
	log := logger.With("component", "TrafficShaper")
141
	return &samplingTrafficShaper{
142
		SugaredLoggerOnWith: log,
143
		computePieceSize:    computePieceSize,
144
		totalRateLimit:      totalRateLimit,
145
		lastSecondBandwidth: atomic.NewInt64(0),
146
		usingBandWidth:      atomic.NewInt64(0),
147
		tasks:               make(map[string]*taskEntry),
148
		stopCh:              make(chan struct{}),
149
	}
150
}
151

152
func (ts *samplingTrafficShaper) Start() {
153
	go func() {
154
		// update bandwidth of all running tasks every second
155
		ticker := time.NewTicker(time.Second)
156
		defer ticker.Stop()
157
		for {
158
			select {
159
			case <-ticker.C:
160
				ts.lastSecondBandwidth.Store(ts.usingBandWidth.Load())
161
				ts.usingBandWidth.Store(0)
162
				ts.updateLimit()
163
			case <-ts.stopCh:
164
				return
165
			}
166
		}
167
	}()
168
}
169

170
func (ts *samplingTrafficShaper) Stop() {
171
	close(ts.stopCh)
172
}
173

174
// updateLimit updates every task's limit every second
175
func (ts *samplingTrafficShaper) updateLimit() {
176
	var totalNeedBandwidth int64
177
	var totalLeastBandwidth int64
178
	ts.RLock()
179
	defer ts.RUnlock()
180
	// compute overall remaining length of all tasks
181
	for _, te := range ts.tasks {
182
		oldLimit := int64(te.ptc.limiter.Limit())
183
		needBandwidth := te.lastSecondBandwidth.Swap(0)
184
		if !te.needUpdate {
185
			// if this task is added within 1 second, don't reduce its limit this time
186
			te.needUpdate = true
187
			needBandwidth = math.Max(needBandwidth, oldLimit)
188
		}
189
		if contentLength := te.ptc.contentLength.Load(); contentLength > 0 {
190
			remainingLength := contentLength - te.ptc.completedLength.Load()
191
			needBandwidth = math.Min(remainingLength, needBandwidth)
192
		}
193
		// delta bandwidth, make sure it's larger than 0
194
		needBandwidth = math.Max(needBandwidth-int64(te.pieceSize), int64(0))
195
		te.needBandwidth = needBandwidth
196
		totalNeedBandwidth += needBandwidth
197
		totalLeastBandwidth += int64(te.pieceSize)
198
	}
199

200
	// allocate delta bandwidth for tasks
201
	for _, te := range ts.tasks {
202
		// diffLimit indicates the difference between the allocated bandwidth and pieceSize
203
		var diffLimit float64
204
		// make sure new limit is not smaller than pieceSize
205
		diffLimit = math.Max(
206
			(float64(ts.totalRateLimit)-float64(totalLeastBandwidth))*(float64(te.needBandwidth)/float64(totalNeedBandwidth)), 0)
207
		te.ptc.limiter.SetLimit(rate.Limit(diffLimit + float64(te.pieceSize)))
208
		ts.Debugf("period update limit, task %s, need bandwidth %d, diff rate limit %f", te.ptc.taskID, te.needBandwidth, diffLimit)
209
	}
210
}
211

212
func (ts *samplingTrafficShaper) AddTask(taskID string, ptc *peerTaskConductor) {
213
	ts.Lock()
214
	defer ts.Unlock()
215
	nTasks := len(ts.tasks)
216
	if nTasks == 0 {
217
		nTasks++
218
	}
219
	pieceSize := ts.computePieceSize(ptc.contentLength.Load())
220
	limit := rate.Limit(math.Max(float64(ts.totalRateLimit)/float64(nTasks), float64(pieceSize)))
221
	// make sure bandwidth is not smaller than pieceSize
222
	ptc.limiter.SetLimit(limit)
223
	ts.tasks[taskID] = &taskEntry{ptc: ptc, lastSecondBandwidth: atomic.NewInt64(0), pieceSize: pieceSize}
224
	var totalNeedRateLimit rate.Limit
225
	for _, te := range ts.tasks {
226
		totalNeedRateLimit += te.ptc.limiter.Limit()
227
	}
228
	ratio := ts.totalRateLimit / totalNeedRateLimit
229
	// reduce all running tasks' bandwidth
230
	for _, te := range ts.tasks {
231
		// make sure bandwidth is not smaller than pieceSize
232
		newLimit := math.Max(ratio*te.ptc.limiter.Limit(), rate.Limit(te.pieceSize))
233
		te.ptc.limiter.SetLimit(newLimit)
234
		ts.Debugf("a task added, task %s rate limit updated to %f", te.ptc.taskID, newLimit)
235
	}
236
}
237

238
func (ts *samplingTrafficShaper) RemoveTask(taskID string) {
239
	ts.Lock()
240
	defer ts.Unlock()
241

242
	var limit rate.Limit
243
	if task, ok := ts.tasks[taskID]; ok {
244
		limit = task.ptc.limiter.Limit()
245
	} else {
246
		ts.Debugf("the task %s is already removed", taskID)
247
		return
248
	}
249

250
	delete(ts.tasks, taskID)
251
	ratio := ts.totalRateLimit / (ts.totalRateLimit - limit)
252
	// increase all running tasks' bandwidth
253
	for _, te := range ts.tasks {
254
		newLimit := ratio * te.ptc.limiter.Limit()
255
		te.ptc.limiter.SetLimit(newLimit)
256
		ts.Debugf("a task removed, task %s rate limit updated to %f", te.ptc.taskID, newLimit)
257
	}
258
}
259

260
func (ts *samplingTrafficShaper) Record(taskID string, n int) {
261
	ts.usingBandWidth.Add(int64(n))
262
	ts.RLock()
263
	if task, ok := ts.tasks[taskID]; ok {
264
		task.lastSecondBandwidth.Add(int64(n))
265
	} else {
266
		ts.Warnf("the task %s is not found when record it", taskID)
267
	}
268
	ts.RUnlock()
269
}
270

271
func (ts *samplingTrafficShaper) GetBandwidth() int64 {
272
	return ts.lastSecondBandwidth.Load()
273
}
274

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.