Dragonfly2
174 строки · 5.4 Кб
1/*
2* Copyright 2022 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*
16*/
17
18package peer
19
20import (
21"cmp"
22"errors"
23"math/rand"
24"sync"
25"time"
26
27"go.uber.org/atomic"
28"golang.org/x/exp/maps"
29"golang.org/x/exp/slices"
30
31logger "d7y.io/dragonfly/v2/internal/dflog"
32)
33
34type PieceDispatcher interface {
35// Put pieceSynchronizer put piece request into PieceDispatcher
36Put(req *DownloadPieceRequest)
37// Get downloader will get piece request from PieceDispatcher
38Get() (req *DownloadPieceRequest, err error)
39// Report downloader will report piece download result to PieceDispatcher, so PieceDispatcher can score peers
40Report(result *DownloadPieceResult)
41// Close related resources, and not accept Put and Get anymore
42Close()
43}
44
45var ErrNoValidPieceTemporarily = errors.New("no valid piece temporarily")
46
47type pieceDispatcher struct {
48// peerRequests hold piece requests of peers. Key is PeerID, value is piece requests
49peerRequests map[string][]*DownloadPieceRequest
50// score hold the score of each peer.
51score map[string]int64
52// downloaded hold the already successfully downloaded piece num
53downloaded map[int32]struct{}
54// sum is the valid num of piece requests. When sum == 0, the consumer will wait until there is a request is putted
55sum *atomic.Int64
56closed bool
57cond *sync.Cond
58lock *sync.Mutex
59log *logger.SugaredLoggerOnWith
60randomRatio float64
61// rand is not thread-safe
62rand *rand.Rand
63}
64
65var (
66// the lower, the better
67maxScore = int64(0)
68minScore = (60 * time.Second).Nanoseconds()
69)
70
71func NewPieceDispatcher(randomRatio float64, log *logger.SugaredLoggerOnWith) PieceDispatcher {
72lock := &sync.Mutex{}
73pd := &pieceDispatcher{
74peerRequests: map[string][]*DownloadPieceRequest{},
75score: map[string]int64{},
76downloaded: map[int32]struct{}{},
77sum: atomic.NewInt64(0),
78closed: false,
79cond: sync.NewCond(lock),
80lock: lock,
81log: log.With("component", "pieceDispatcher"),
82randomRatio: randomRatio,
83rand: rand.New(rand.NewSource(time.Now().Unix())),
84}
85log.Debugf("piece dispatcher created")
86return pd
87}
88
89func (p *pieceDispatcher) Put(req *DownloadPieceRequest) {
90p.lock.Lock()
91defer p.lock.Unlock()
92if reqs, ok := p.peerRequests[req.DstPid]; ok {
93p.peerRequests[req.DstPid] = append(reqs, req)
94} else {
95p.peerRequests[req.DstPid] = []*DownloadPieceRequest{req}
96}
97if _, ok := p.score[req.DstPid]; !ok {
98p.score[req.DstPid] = maxScore
99}
100p.sum.Add(1)
101p.cond.Broadcast()
102}
103
104func (p *pieceDispatcher) Get() (req *DownloadPieceRequest, err error) {
105p.lock.Lock()
106defer p.lock.Unlock()
107for p.sum.Load() == 0 && !p.closed {
108p.cond.Wait()
109}
110if p.closed {
111return nil, errors.New("piece dispatcher already closed")
112}
113return p.getDesiredReq()
114}
115
116// getDesiredReq return a req according to performance of each dest peer. It is not thread-safe
117func (p *pieceDispatcher) getDesiredReq() (*DownloadPieceRequest, error) {
118distPeerIDs := maps.Keys(p.score)
119if p.rand.Float64() < p.randomRatio { //random shuffle with the probability of randomRatio
120p.rand.Shuffle(len(distPeerIDs), func(i, j int) {
121tmp := distPeerIDs[j]
122distPeerIDs[j] = distPeerIDs[i]
123distPeerIDs[i] = tmp
124})
125} else { // sort by score with the probability of (1-randomRatio)
126slices.SortFunc(distPeerIDs, func(p1, p2 string) int { return cmp.Compare(p.score[p1], p.score[p2]) })
127}
128
129// iterate all peers, until get a valid piece requests
130for _, peer := range distPeerIDs {
131for len(p.peerRequests[peer]) > 0 {
132// choose a random piece request of a peer
133n := p.rand.Intn(len(p.peerRequests[peer]))
134req := p.peerRequests[peer][n]
135p.peerRequests[peer] = append(p.peerRequests[peer][0:n], p.peerRequests[peer][n+1:]...)
136p.sum.Sub(1)
137if _, ok := p.downloaded[req.piece.PieceNum]; ok { //already downloaded, skip
138// p.log.Debugf("skip already downloaded piece , peer: %s, piece:%d", peer, req.piece.PieceNum)
139continue
140}
141// p.log.Debugf("scores :%v, select :%s, piece:%v", p.score, peer, req.piece.PieceNum)
142return req, nil
143}
144}
145return nil, ErrNoValidPieceTemporarily
146}
147
148// Report pieceDispatcher will score peer according to the download result reported by downloader
149// The score of peer is not determined only by last piece downloaded, it is smoothed.
150func (p *pieceDispatcher) Report(result *DownloadPieceResult) {
151p.lock.Lock()
152defer p.lock.Unlock()
153if result == nil || result.DstPeerID == "" {
154return
155}
156lastScore := p.score[result.DstPeerID]
157if result.Fail {
158p.score[result.DstPeerID] = (lastScore + minScore) / 2
159} else {
160if result.pieceInfo != nil {
161p.downloaded[result.pieceInfo.PieceNum] = struct{}{}
162}
163p.score[result.DstPeerID] = (lastScore + result.FinishTime - result.BeginTime) / 2
164}
165return
166}
167
168func (p *pieceDispatcher) Close() {
169p.lock.Lock()
170p.closed = true
171p.cond.Broadcast()
172p.log.Debugf("piece dispatcher closed")
173p.lock.Unlock()
174}
175