Dragonfly2

Форк
0
/
peer_manager.go 
250 строк · 6.7 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
//go:generate mockgen -destination peer_manager_mock.go -source peer_manager.go -package resource
18

19
package resource
20

21
import (
22
	"context"
23
	"sync"
24
	"time"
25

26
	pkggc "d7y.io/dragonfly/v2/pkg/gc"
27
	"d7y.io/dragonfly/v2/scheduler/config"
28
)
29

30
const (
31
	// GC peer id.
32
	GCPeerID = "peer"
33
)
34

35
// PeerManager is the interface used for peer manager.
36
type PeerManager interface {
37
	// Load returns peer for a key.
38
	Load(string) (*Peer, bool)
39

40
	// Store sets peer.
41
	Store(*Peer)
42

43
	// LoadOrStore returns peer the key if present.
44
	// Otherwise, it stores and returns the given peer.
45
	// The loaded result is true if the peer was loaded, false if stored.
46
	LoadOrStore(*Peer) (*Peer, bool)
47

48
	// Delete deletes peer for a key.
49
	Delete(string)
50

51
	// Range calls f sequentially for each key and value present in the map.
52
	// If f returns false, range stops the iteration.
53
	Range(f func(any, any) bool)
54

55
	// Try to reclaim peer.
56
	RunGC() error
57
}
58

59
// peerManager contains content for peer manager.
60
type peerManager struct {
61
	// Peer sync map.
62
	*sync.Map
63

64
	// peerTTL is time to live of peer.
65
	peerTTL time.Duration
66

67
	// hostTTL is time to live of host.
68
	hostTTL time.Duration
69

70
	// pieceDownloadTimeout is timeout of downloading piece.
71
	pieceDownloadTimeout time.Duration
72

73
	// mu is peer mutex.
74
	mu *sync.Mutex
75
}
76

77
// New peer manager interface.
78
func newPeerManager(cfg *config.GCConfig, gc pkggc.GC) (PeerManager, error) {
79
	p := &peerManager{
80
		Map:                  &sync.Map{},
81
		peerTTL:              cfg.PeerTTL,
82
		hostTTL:              cfg.HostTTL,
83
		pieceDownloadTimeout: cfg.PieceDownloadTimeout,
84
		mu:                   &sync.Mutex{},
85
	}
86

87
	if err := gc.Add(pkggc.Task{
88
		ID:       GCPeerID,
89
		Interval: cfg.PeerGCInterval,
90
		Timeout:  cfg.PeerGCInterval,
91
		Runner:   p,
92
	}); err != nil {
93
		return nil, err
94
	}
95

96
	return p, nil
97
}
98

99
// Load returns peer for a key.
100
func (p *peerManager) Load(key string) (*Peer, bool) {
101
	rawPeer, loaded := p.Map.Load(key)
102
	if !loaded {
103
		return nil, false
104
	}
105

106
	return rawPeer.(*Peer), loaded
107
}
108

109
// Store sets peer.
110
func (p *peerManager) Store(peer *Peer) {
111
	p.mu.Lock()
112
	defer p.mu.Unlock()
113

114
	p.Map.Store(peer.ID, peer)
115
	peer.Task.StorePeer(peer)
116
	peer.Host.StorePeer(peer)
117
}
118

119
// LoadOrStore returns peer the key if present.
120
// Otherwise, it stores and returns the given peer.
121
// The loaded result is true if the peer was loaded, false if stored.
122
func (p *peerManager) LoadOrStore(peer *Peer) (*Peer, bool) {
123
	p.mu.Lock()
124
	defer p.mu.Unlock()
125

126
	rawPeer, loaded := p.Map.LoadOrStore(peer.ID, peer)
127
	if !loaded {
128
		peer.Host.StorePeer(peer)
129
		peer.Task.StorePeer(peer)
130
	}
131

132
	return rawPeer.(*Peer), loaded
133
}
134

135
// Delete deletes peer for a key.
136
func (p *peerManager) Delete(key string) {
137
	p.mu.Lock()
138
	defer p.mu.Unlock()
139

140
	if peer, loaded := p.Load(key); loaded {
141
		p.Map.Delete(key)
142
		peer.Task.DeletePeer(key)
143
		peer.Host.DeletePeer(key)
144
	}
145
}
146

147
// Range calls f sequentially for each key and value present in the map.
148
// If f returns false, range stops the iteration.
149
func (p *peerManager) Range(f func(key, value any) bool) {
150
	p.Map.Range(f)
151
}
152

153
// Try to reclaim peer.
154
func (p *peerManager) RunGC() error {
155
	p.Map.Range(func(_, value any) bool {
156
		peer, ok := value.(*Peer)
157
		if !ok {
158
			peer.Log.Warn("invalid peer")
159
			return true
160
		}
161

162
		// If the peer state is PeerStateLeave,
163
		// peer will be reclaimed.
164
		if peer.FSM.Is(PeerStateLeave) {
165
			p.Delete(peer.ID)
166
			peer.Log.Info("peer has been reclaimed")
167
			return true
168
		}
169

170
		// If the peer's elapsed of downloading piece exceeds the pieceDownloadTimeout,
171
		// then sets the peer state to PeerStateLeave and then delete peer.
172
		if peer.FSM.Is(PeerStateRunning) || peer.FSM.Is(PeerStateBackToSource) {
173
			elapsed := time.Since(peer.PieceUpdatedAt.Load())
174
			if elapsed > p.pieceDownloadTimeout {
175
				peer.Log.Info("peer elapsed exceeds the timeout of downloading piece, causing the peer to leave")
176
				if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
177
					peer.Log.Errorf("peer fsm event failed: %s", err.Error())
178
					return true
179
				}
180

181
				return true
182
			}
183
		}
184

185
		// If the peer's elapsed exceeds the peer ttl,
186
		// then set the peer state to PeerStateLeave and then delete peer.
187
		elapsed := time.Since(peer.UpdatedAt.Load())
188
		if elapsed > p.peerTTL {
189
			peer.Log.Info("peer elapsed exceeds the peer ttl, causing the peer to leave")
190
			if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
191
				peer.Log.Errorf("peer fsm event failed: %s", err.Error())
192
				return true
193
			}
194

195
			return true
196
		}
197

198
		// If the host's elapsed exceeds the host ttl,
199
		// then set the peer state to PeerStateLeave and then delete peer.
200
		elapsed = time.Since(peer.Host.UpdatedAt.Load())
201
		if elapsed > p.hostTTL {
202
			peer.Log.Info("peer elapsed exceeds the host ttl, causing the peer to leave")
203
			if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
204
				peer.Log.Errorf("peer fsm event failed: %s", err.Error())
205
				return true
206
			}
207

208
			return true
209
		}
210

211
		// If the peer's state is PeerStateFailed,
212
		// then set the peer state to PeerStateLeave and then delete peer.
213
		if peer.FSM.Is(PeerStateFailed) {
214
			peer.Log.Info("peer state is PeerStateFailed, causing the peer to leave")
215
			if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
216
				peer.Log.Errorf("peer fsm event failed: %s", err.Error())
217
				return true
218
			}
219
		}
220

221
		// If no peer exists in the dag of the task,
222
		// delete the peer.
223
		degree, err := peer.Task.PeerDegree(peer.ID)
224
		if err != nil {
225
			p.Delete(peer.ID)
226
			peer.Log.Info("peer has been reclaimed")
227
			return true
228
		}
229

230
		// If the task dag size exceeds the limit,
231
		// then set the peer state to PeerStateLeave which state is
232
		// PeerStateSucceeded, and degree is zero.
233
		if peer.Task.PeerCount() > PeerCountLimitForTask &&
234
			peer.FSM.Is(PeerStateSucceeded) && degree == 0 {
235
			peer.Log.Info("task dag size exceeds the limit, causing the peer to leave")
236
			if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil {
237
				peer.Log.Errorf("peer fsm event failed: %s", err.Error())
238
				return true
239
			}
240

241
			p.Delete(peer.ID)
242
			peer.Log.Info("peer has been reclaimed")
243
			return true
244
		}
245

246
		return true
247
	})
248

249
	return nil
250
}
251

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.