Dragonfly2
250 строк · 6.7 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17//go:generate mockgen -destination peer_manager_mock.go -source peer_manager.go -package resource
18
19package resource
20
21import (
22"context"
23"sync"
24"time"
25
26pkggc "d7y.io/dragonfly/v2/pkg/gc"
27"d7y.io/dragonfly/v2/scheduler/config"
28)
29
30const (
31// GC peer id.
32GCPeerID = "peer"
33)
34
35// PeerManager is the interface used for peer manager.
36type PeerManager interface {
37// Load returns peer for a key.
38Load(string) (*Peer, bool)
39
40// Store sets peer.
41Store(*Peer)
42
43// LoadOrStore returns peer the key if present.
44// Otherwise, it stores and returns the given peer.
45// The loaded result is true if the peer was loaded, false if stored.
46LoadOrStore(*Peer) (*Peer, bool)
47
48// Delete deletes peer for a key.
49Delete(string)
50
51// Range calls f sequentially for each key and value present in the map.
52// If f returns false, range stops the iteration.
53Range(f func(any, any) bool)
54
55// Try to reclaim peer.
56RunGC() error
57}
58
59// peerManager contains content for peer manager.
60type peerManager struct {
61// Peer sync map.
62*sync.Map
63
64// peerTTL is time to live of peer.
65peerTTL time.Duration
66
67// hostTTL is time to live of host.
68hostTTL time.Duration
69
70// pieceDownloadTimeout is timeout of downloading piece.
71pieceDownloadTimeout time.Duration
72
73// mu is peer mutex.
74mu *sync.Mutex
75}
76
77// New peer manager interface.
78func newPeerManager(cfg *config.GCConfig, gc pkggc.GC) (PeerManager, error) {
79p := &peerManager{
80Map: &sync.Map{},
81peerTTL: cfg.PeerTTL,
82hostTTL: cfg.HostTTL,
83pieceDownloadTimeout: cfg.PieceDownloadTimeout,
84mu: &sync.Mutex{},
85}
86
87if err := gc.Add(pkggc.Task{
88ID: GCPeerID,
89Interval: cfg.PeerGCInterval,
90Timeout: cfg.PeerGCInterval,
91Runner: p,
92}); err != nil {
93return nil, err
94}
95
96return p, nil
97}
98
99// Load returns peer for a key.
100func (p *peerManager) Load(key string) (*Peer, bool) {
101rawPeer, loaded := p.Map.Load(key)
102if !loaded {
103return nil, false
104}
105
106return rawPeer.(*Peer), loaded
107}
108
109// Store sets peer.
110func (p *peerManager) Store(peer *Peer) {
111p.mu.Lock()
112defer p.mu.Unlock()
113
114p.Map.Store(peer.ID, peer)
115peer.Task.StorePeer(peer)
116peer.Host.StorePeer(peer)
117}
118
119// LoadOrStore returns peer the key if present.
120// Otherwise, it stores and returns the given peer.
121// The loaded result is true if the peer was loaded, false if stored.
122func (p *peerManager) LoadOrStore(peer *Peer) (*Peer, bool) {
123p.mu.Lock()
124defer p.mu.Unlock()
125
126rawPeer, loaded := p.Map.LoadOrStore(peer.ID, peer)
127if !loaded {
128peer.Host.StorePeer(peer)
129peer.Task.StorePeer(peer)
130}
131
132return rawPeer.(*Peer), loaded
133}
134
135// Delete deletes peer for a key.
136func (p *peerManager) Delete(key string) {
137p.mu.Lock()
138defer p.mu.Unlock()
139
140if peer, loaded := p.Load(key); loaded {
141p.Map.Delete(key)
142peer.Task.DeletePeer(key)
143peer.Host.DeletePeer(key)
144}
145}
146
147// Range calls f sequentially for each key and value present in the map.
148// If f returns false, range stops the iteration.
149func (p *peerManager) Range(f func(key, value any) bool) {
150p.Map.Range(f)
151}
152
153// Try to reclaim peer.
154func (p *peerManager) RunGC() error {
155p.Map.Range(func(_, value any) bool {
156peer, ok := value.(*Peer)
157if !ok {
158peer.Log.Warn("invalid peer")
159return true
160}
161
162// If the peer state is PeerStateLeave,
163// peer will be reclaimed.
164if peer.FSM.Is(PeerStateLeave) {
165p.Delete(peer.ID)
166peer.Log.Info("peer has been reclaimed")
167return true
168}
169
170// If the peer's elapsed of downloading piece exceeds the pieceDownloadTimeout,
171// then sets the peer state to PeerStateLeave and then delete peer.
172if peer.FSM.Is(PeerStateRunning) || peer.FSM.Is(PeerStateBackToSource) {
173elapsed := time.Since(peer.PieceUpdatedAt.Load())
174if elapsed > p.pieceDownloadTimeout {
175peer.Log.Info("peer elapsed exceeds the timeout of downloading piece, causing the peer to leave")
176if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
177peer.Log.Errorf("peer fsm event failed: %s", err.Error())
178return true
179}
180
181return true
182}
183}
184
185// If the peer's elapsed exceeds the peer ttl,
186// then set the peer state to PeerStateLeave and then delete peer.
187elapsed := time.Since(peer.UpdatedAt.Load())
188if elapsed > p.peerTTL {
189peer.Log.Info("peer elapsed exceeds the peer ttl, causing the peer to leave")
190if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
191peer.Log.Errorf("peer fsm event failed: %s", err.Error())
192return true
193}
194
195return true
196}
197
198// If the host's elapsed exceeds the host ttl,
199// then set the peer state to PeerStateLeave and then delete peer.
200elapsed = time.Since(peer.Host.UpdatedAt.Load())
201if elapsed > p.hostTTL {
202peer.Log.Info("peer elapsed exceeds the host ttl, causing the peer to leave")
203if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
204peer.Log.Errorf("peer fsm event failed: %s", err.Error())
205return true
206}
207
208return true
209}
210
211// If the peer's state is PeerStateFailed,
212// then set the peer state to PeerStateLeave and then delete peer.
213if peer.FSM.Is(PeerStateFailed) {
214peer.Log.Info("peer state is PeerStateFailed, causing the peer to leave")
215if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
216peer.Log.Errorf("peer fsm event failed: %s", err.Error())
217return true
218}
219}
220
221// If no peer exists in the dag of the task,
222// delete the peer.
223degree, err := peer.Task.PeerDegree(peer.ID)
224if err != nil {
225p.Delete(peer.ID)
226peer.Log.Info("peer has been reclaimed")
227return true
228}
229
230// If the task dag size exceeds the limit,
231// then set the peer state to PeerStateLeave which state is
232// PeerStateSucceeded, and degree is zero.
233if peer.Task.PeerCount() > PeerCountLimitForTask &&
234peer.FSM.Is(PeerStateSucceeded) && degree == 0 {
235peer.Log.Info("task dag size exceeds the limit, causing the peer to leave")
236if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
237peer.Log.Errorf("peer fsm event failed: %s", err.Error())
238return true
239}
240
241p.Delete(peer.ID)
242peer.Log.Info("peer has been reclaimed")
243return true
244}
245
246return true
247})
248
249return nil
250}
251