Dragonfly2

Форк
0
/
piece_dispatcher.go 
174 строки · 5.4 Кб
1
/*
2
 *     Copyright 2022 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 *
16
 */
17

18
package peer
19

20
import (
21
	"cmp"
22
	"errors"
23
	"math/rand"
24
	"sync"
25
	"time"
26

27
	"go.uber.org/atomic"
28
	"golang.org/x/exp/maps"
29
	"golang.org/x/exp/slices"
30

31
	logger "d7y.io/dragonfly/v2/internal/dflog"
32
)
33

34
type PieceDispatcher interface {
35
	// Put pieceSynchronizer put piece request into PieceDispatcher
36
	Put(req *DownloadPieceRequest)
37
	// Get downloader will get piece request from PieceDispatcher
38
	Get() (req *DownloadPieceRequest, err error)
39
	// Report downloader will report piece download result to PieceDispatcher, so PieceDispatcher can score peers
40
	Report(result *DownloadPieceResult)
41
	// Close related resources, and not accept Put and Get anymore
42
	Close()
43
}
44

45
var ErrNoValidPieceTemporarily = errors.New("no valid piece temporarily")
46

47
type pieceDispatcher struct {
48
	// peerRequests hold piece requests of peers. Key is PeerID, value is piece requests
49
	peerRequests map[string][]*DownloadPieceRequest
50
	// score hold the score of each peer.
51
	score map[string]int64
52
	// downloaded hold the already successfully downloaded piece num
53
	downloaded map[int32]struct{}
54
	// sum is the valid num of piece requests. When sum == 0, the consumer will wait until there is a request is putted
55
	sum         *atomic.Int64
56
	closed      bool
57
	cond        *sync.Cond
58
	lock        *sync.Mutex
59
	log         *logger.SugaredLoggerOnWith
60
	randomRatio float64
61
	// rand is not thread-safe
62
	rand *rand.Rand
63
}
64

65
var (
66
	// the lower, the better
67
	maxScore = int64(0)
68
	minScore = (60 * time.Second).Nanoseconds()
69
)
70

71
func NewPieceDispatcher(randomRatio float64, log *logger.SugaredLoggerOnWith) PieceDispatcher {
72
	lock := &sync.Mutex{}
73
	pd := &pieceDispatcher{
74
		peerRequests: map[string][]*DownloadPieceRequest{},
75
		score:        map[string]int64{},
76
		downloaded:   map[int32]struct{}{},
77
		sum:          atomic.NewInt64(0),
78
		closed:       false,
79
		cond:         sync.NewCond(lock),
80
		lock:         lock,
81
		log:          log.With("component", "pieceDispatcher"),
82
		randomRatio:  randomRatio,
83
		rand:         rand.New(rand.NewSource(time.Now().Unix())),
84
	}
85
	log.Debugf("piece dispatcher created")
86
	return pd
87
}
88

89
func (p *pieceDispatcher) Put(req *DownloadPieceRequest) {
90
	p.lock.Lock()
91
	defer p.lock.Unlock()
92
	if reqs, ok := p.peerRequests[req.DstPid]; ok {
93
		p.peerRequests[req.DstPid] = append(reqs, req)
94
	} else {
95
		p.peerRequests[req.DstPid] = []*DownloadPieceRequest{req}
96
	}
97
	if _, ok := p.score[req.DstPid]; !ok {
98
		p.score[req.DstPid] = maxScore
99
	}
100
	p.sum.Add(1)
101
	p.cond.Broadcast()
102
}
103

104
func (p *pieceDispatcher) Get() (req *DownloadPieceRequest, err error) {
105
	p.lock.Lock()
106
	defer p.lock.Unlock()
107
	for p.sum.Load() == 0 && !p.closed {
108
		p.cond.Wait()
109
	}
110
	if p.closed {
111
		return nil, errors.New("piece dispatcher already closed")
112
	}
113
	return p.getDesiredReq()
114
}
115

116
// getDesiredReq return a req according to performance of each dest peer. It is not thread-safe
117
func (p *pieceDispatcher) getDesiredReq() (*DownloadPieceRequest, error) {
118
	distPeerIDs := maps.Keys(p.score)
119
	if p.rand.Float64() < p.randomRatio { //random shuffle with the probability of randomRatio
120
		p.rand.Shuffle(len(distPeerIDs), func(i, j int) {
121
			tmp := distPeerIDs[j]
122
			distPeerIDs[j] = distPeerIDs[i]
123
			distPeerIDs[i] = tmp
124
		})
125
	} else { // sort by score with the probability of (1-randomRatio)
126
		slices.SortFunc(distPeerIDs, func(p1, p2 string) int { return cmp.Compare(p.score[p1], p.score[p2]) })
127
	}
128

129
	// iterate all peers, until get a valid piece requests
130
	for _, peer := range distPeerIDs {
131
		for len(p.peerRequests[peer]) > 0 {
132
			// choose a random piece request of a peer
133
			n := p.rand.Intn(len(p.peerRequests[peer]))
134
			req := p.peerRequests[peer][n]
135
			p.peerRequests[peer] = append(p.peerRequests[peer][0:n], p.peerRequests[peer][n+1:]...)
136
			p.sum.Sub(1)
137
			if _, ok := p.downloaded[req.piece.PieceNum]; ok { //already downloaded, skip
138
				// p.log.Debugf("skip already downloaded piece , peer: %s, piece:%d", peer, req.piece.PieceNum)
139
				continue
140
			}
141
			// p.log.Debugf("scores :%v, select :%s, piece:%v", p.score, peer, req.piece.PieceNum)
142
			return req, nil
143
		}
144
	}
145
	return nil, ErrNoValidPieceTemporarily
146
}
147

148
// Report pieceDispatcher will score peer according to the download result reported by downloader
149
// The score of peer is not determined only by last piece downloaded, it is smoothed.
150
func (p *pieceDispatcher) Report(result *DownloadPieceResult) {
151
	p.lock.Lock()
152
	defer p.lock.Unlock()
153
	if result == nil || result.DstPeerID == "" {
154
		return
155
	}
156
	lastScore := p.score[result.DstPeerID]
157
	if result.Fail {
158
		p.score[result.DstPeerID] = (lastScore + minScore) / 2
159
	} else {
160
		if result.pieceInfo != nil {
161
			p.downloaded[result.pieceInfo.PieceNum] = struct{}{}
162
		}
163
		p.score[result.DstPeerID] = (lastScore + result.FinishTime - result.BeginTime) / 2
164
	}
165
	return
166
}
167

168
func (p *pieceDispatcher) Close() {
169
	p.lock.Lock()
170
	p.closed = true
171
	p.cond.Broadcast()
172
	p.log.Debugf("piece dispatcher closed")
173
	p.lock.Unlock()
174
}
175

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.