Dragonfly2
273 строки · 8.0 Кб
1/*
2* Copyright 2022 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package peer
18
19import (
20"sync"
21"time"
22
23"go.uber.org/atomic"
24"golang.org/x/time/rate"
25
26logger "d7y.io/dragonfly/v2/internal/dflog"
27"d7y.io/dragonfly/v2/pkg/math"
28)
29
30const (
31TypePlainTrafficShaper = "plain"
32TypeSamplingTrafficShaper = "sampling"
33)
34
35// TrafficShaper allocates bandwidth for running tasks dynamically
36type TrafficShaper interface {
37// Start starts the TrafficShaper
38Start()
39// Stop stops the TrafficShaper
40Stop()
41// AddTask starts managing the new task
42AddTask(taskID string, ptc *peerTaskConductor)
43// RemoveTask removes completed task
44RemoveTask(taskID string)
45// Record records task's used bandwidth
46Record(taskID string, n int)
47// GetBandwidth gets the total download bandwidth in the past second
48GetBandwidth() int64
49}
50
51func NewTrafficShaper(trafficShaperType string, totalRateLimit rate.Limit, computePieceSize func(int64) uint32) TrafficShaper {
52var ts TrafficShaper
53switch trafficShaperType {
54case TypeSamplingTrafficShaper:
55ts = NewSamplingTrafficShaper(totalRateLimit, computePieceSize)
56case TypePlainTrafficShaper:
57ts = NewPlainTrafficShaper()
58default:
59logger.Warnf("type \"%s\" doesn't exist, use plain traffic shaper instead", trafficShaperType)
60ts = NewPlainTrafficShaper()
61}
62return ts
63}
64
65type plainTrafficShaper struct {
66// total used bandwidth in the past second
67lastSecondBandwidth *atomic.Int64
68// total used bandwidth in the current second
69usingBandWidth *atomic.Int64
70stopCh chan struct{}
71}
72
73func NewPlainTrafficShaper() TrafficShaper {
74return &plainTrafficShaper{
75lastSecondBandwidth: atomic.NewInt64(0),
76usingBandWidth: atomic.NewInt64(0),
77stopCh: make(chan struct{}),
78}
79}
80
81func (ts *plainTrafficShaper) Start() {
82go func() {
83ticker := time.NewTicker(time.Second)
84defer ticker.Stop()
85for {
86select {
87case <-ticker.C:
88ts.lastSecondBandwidth.Store(ts.usingBandWidth.Load())
89ts.usingBandWidth.Store(0)
90case <-ts.stopCh:
91return
92}
93}
94}()
95}
96
97func (ts *plainTrafficShaper) Stop() {
98close(ts.stopCh)
99}
100
101func (ts *plainTrafficShaper) AddTask(_ string, _ *peerTaskConductor) {
102}
103
104func (ts *plainTrafficShaper) RemoveTask(_ string) {
105}
106
107func (ts *plainTrafficShaper) Record(_ string, n int) {
108ts.usingBandWidth.Add(int64(n))
109}
110
111func (ts *plainTrafficShaper) GetBandwidth() int64 {
112return ts.lastSecondBandwidth.Load()
113}
114
115type taskEntry struct {
116ptc *peerTaskConductor
117pieceSize uint32
118// used bandwidth in the past second
119lastSecondBandwidth *atomic.Int64
120// need bandwidth in the next second
121needBandwidth int64
122// indicates if the bandwidth need to be updated, tasks added within one second don't need to be updated
123needUpdate bool
124}
125
126type samplingTrafficShaper struct {
127*logger.SugaredLoggerOnWith
128sync.RWMutex
129computePieceSize func(int64) uint32
130totalRateLimit rate.Limit
131// total used bandwidth in the past second
132lastSecondBandwidth *atomic.Int64
133// total used bandwidth in the current second
134usingBandWidth *atomic.Int64
135tasks map[string]*taskEntry
136stopCh chan struct{}
137}
138
139func NewSamplingTrafficShaper(totalRateLimit rate.Limit, computePieceSize func(int64) uint32) TrafficShaper {
140log := logger.With("component", "TrafficShaper")
141return &samplingTrafficShaper{
142SugaredLoggerOnWith: log,
143computePieceSize: computePieceSize,
144totalRateLimit: totalRateLimit,
145lastSecondBandwidth: atomic.NewInt64(0),
146usingBandWidth: atomic.NewInt64(0),
147tasks: make(map[string]*taskEntry),
148stopCh: make(chan struct{}),
149}
150}
151
152func (ts *samplingTrafficShaper) Start() {
153go func() {
154// update bandwidth of all running tasks every second
155ticker := time.NewTicker(time.Second)
156defer ticker.Stop()
157for {
158select {
159case <-ticker.C:
160ts.lastSecondBandwidth.Store(ts.usingBandWidth.Load())
161ts.usingBandWidth.Store(0)
162ts.updateLimit()
163case <-ts.stopCh:
164return
165}
166}
167}()
168}
169
170func (ts *samplingTrafficShaper) Stop() {
171close(ts.stopCh)
172}
173
174// updateLimit updates every task's limit every second
175func (ts *samplingTrafficShaper) updateLimit() {
176var totalNeedBandwidth int64
177var totalLeastBandwidth int64
178ts.RLock()
179defer ts.RUnlock()
180// compute overall remaining length of all tasks
181for _, te := range ts.tasks {
182oldLimit := int64(te.ptc.limiter.Limit())
183needBandwidth := te.lastSecondBandwidth.Swap(0)
184if !te.needUpdate {
185// if this task is added within 1 second, don't reduce its limit this time
186te.needUpdate = true
187needBandwidth = math.Max(needBandwidth, oldLimit)
188}
189if contentLength := te.ptc.contentLength.Load(); contentLength > 0 {
190remainingLength := contentLength - te.ptc.completedLength.Load()
191needBandwidth = math.Min(remainingLength, needBandwidth)
192}
193// delta bandwidth, make sure it's larger than 0
194needBandwidth = math.Max(needBandwidth-int64(te.pieceSize), int64(0))
195te.needBandwidth = needBandwidth
196totalNeedBandwidth += needBandwidth
197totalLeastBandwidth += int64(te.pieceSize)
198}
199
200// allocate delta bandwidth for tasks
201for _, te := range ts.tasks {
202// diffLimit indicates the difference between the allocated bandwidth and pieceSize
203var diffLimit float64
204// make sure new limit is not smaller than pieceSize
205diffLimit = math.Max(
206(float64(ts.totalRateLimit)-float64(totalLeastBandwidth))*(float64(te.needBandwidth)/float64(totalNeedBandwidth)), 0)
207te.ptc.limiter.SetLimit(rate.Limit(diffLimit + float64(te.pieceSize)))
208ts.Debugf("period update limit, task %s, need bandwidth %d, diff rate limit %f", te.ptc.taskID, te.needBandwidth, diffLimit)
209}
210}
211
212func (ts *samplingTrafficShaper) AddTask(taskID string, ptc *peerTaskConductor) {
213ts.Lock()
214defer ts.Unlock()
215nTasks := len(ts.tasks)
216if nTasks == 0 {
217nTasks++
218}
219pieceSize := ts.computePieceSize(ptc.contentLength.Load())
220limit := rate.Limit(math.Max(float64(ts.totalRateLimit)/float64(nTasks), float64(pieceSize)))
221// make sure bandwidth is not smaller than pieceSize
222ptc.limiter.SetLimit(limit)
223ts.tasks[taskID] = &taskEntry{ptc: ptc, lastSecondBandwidth: atomic.NewInt64(0), pieceSize: pieceSize}
224var totalNeedRateLimit rate.Limit
225for _, te := range ts.tasks {
226totalNeedRateLimit += te.ptc.limiter.Limit()
227}
228ratio := ts.totalRateLimit / totalNeedRateLimit
229// reduce all running tasks' bandwidth
230for _, te := range ts.tasks {
231// make sure bandwidth is not smaller than pieceSize
232newLimit := math.Max(ratio*te.ptc.limiter.Limit(), rate.Limit(te.pieceSize))
233te.ptc.limiter.SetLimit(newLimit)
234ts.Debugf("a task added, task %s rate limit updated to %f", te.ptc.taskID, newLimit)
235}
236}
237
238func (ts *samplingTrafficShaper) RemoveTask(taskID string) {
239ts.Lock()
240defer ts.Unlock()
241
242var limit rate.Limit
243if task, ok := ts.tasks[taskID]; ok {
244limit = task.ptc.limiter.Limit()
245} else {
246ts.Debugf("the task %s is already removed", taskID)
247return
248}
249
250delete(ts.tasks, taskID)
251ratio := ts.totalRateLimit / (ts.totalRateLimit - limit)
252// increase all running tasks' bandwidth
253for _, te := range ts.tasks {
254newLimit := ratio * te.ptc.limiter.Limit()
255te.ptc.limiter.SetLimit(newLimit)
256ts.Debugf("a task removed, task %s rate limit updated to %f", te.ptc.taskID, newLimit)
257}
258}
259
260func (ts *samplingTrafficShaper) Record(taskID string, n int) {
261ts.usingBandWidth.Add(int64(n))
262ts.RLock()
263if task, ok := ts.tasks[taskID]; ok {
264task.lastSecondBandwidth.Add(int64(n))
265} else {
266ts.Warnf("the task %s is not found when record it", taskID)
267}
268ts.RUnlock()
269}
270
271func (ts *samplingTrafficShaper) GetBandwidth() int64 {
272return ts.lastSecondBandwidth.Load()
273}
274