kraken

Форк
0
302 строки · 8.6 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package agentstorage
15

16
import (
17
	"errors"
18
	"fmt"
19
	"io"
20
	"os"
21

22
	"github.com/uber/kraken/core"
23
	"github.com/uber/kraken/lib/store"
24
	"github.com/uber/kraken/lib/torrent/storage"
25
	"github.com/uber/kraken/lib/torrent/storage/piecereader"
26
	"github.com/uber/kraken/utils/log"
27

28
	"github.com/willf/bitset"
29
	"go.uber.org/atomic"
30
)
31

32
var (
33
	errPieceNotComplete   = errors.New("piece not complete")
34
	errWritePieceConflict = errors.New("piece is already being written to")
35
)
36

37
// caDownloadStore defines the CADownloadStore methods which Torrent requires. Useful
38
// for testing purposes, where we need to mock certain methods.
39
type caDownloadStore interface {
40
	MoveDownloadFileToCache(name string) error
41
	GetDownloadFileReadWriter(name string) (store.FileReadWriter, error)
42
	Any() *store.CADownloadStoreScope
43
	Download() *store.CADownloadStoreScope
44
	InCacheError(error) bool
45
}
46

47
// Torrent implements a Torrent on top of an AgentFileStore.
48
// It Allows concurrent writes on distinct pieces, and concurrent reads on all
49
// pieces. Behavior is undefined if multiple Torrent instances are backed
50
// by the same file store and metainfo.
51
type Torrent struct {
52
	metaInfo    *core.MetaInfo
53
	cads        caDownloadStore
54
	pieces      []*piece
55
	numComplete *atomic.Int32
56
	committed   *atomic.Bool
57
}
58

59
// NewTorrent creates a new Torrent.
60
func NewTorrent(cads caDownloadStore, mi *core.MetaInfo) (*Torrent, error) {
61
	pieces, numComplete, err := restorePieces(mi.Digest(), cads, mi.NumPieces())
62
	if err != nil {
63
		return nil, fmt.Errorf("restore pieces: %s", err)
64
	}
65

66
	committed := false
67
	if numComplete == len(pieces) {
68
		if err := cads.MoveDownloadFileToCache(mi.Digest().Hex()); err != nil && !os.IsExist(err) {
69
			return nil, fmt.Errorf("move file to cache: %s", err)
70
		}
71
		committed = true
72
	}
73

74
	return &Torrent{
75
		cads:        cads,
76
		metaInfo:    mi,
77
		pieces:      pieces,
78
		numComplete: atomic.NewInt32(int32(numComplete)),
79
		committed:   atomic.NewBool(committed),
80
	}, nil
81
}
82

83
// Digest returns the digest of the target blob.
84
func (t *Torrent) Digest() core.Digest {
85
	return t.metaInfo.Digest()
86
}
87

88
// Stat returns the storage.TorrentInfo for t.
89
func (t *Torrent) Stat() *storage.TorrentInfo {
90
	return storage.NewTorrentInfo(t.metaInfo, t.Bitfield())
91
}
92

93
// InfoHash returns the torrent metainfo hash.
94
func (t *Torrent) InfoHash() core.InfoHash {
95
	return t.metaInfo.InfoHash()
96
}
97

98
// NumPieces returns the number of pieces in the torrent.
99
func (t *Torrent) NumPieces() int {
100
	return len(t.pieces)
101
}
102

103
// Length returns the length of the target file.
104
func (t *Torrent) Length() int64 {
105
	return t.metaInfo.Length()
106
}
107

108
// PieceLength returns the length of piece pi.
109
func (t *Torrent) PieceLength(pi int) int64 {
110
	return t.metaInfo.GetPieceLength(pi)
111
}
112

113
// MaxPieceLength returns the longest piece length of the torrent.
114
func (t *Torrent) MaxPieceLength() int64 {
115
	return t.PieceLength(0)
116
}
117

118
// Complete indicates whether the torrent is complete or not. Completeness is
119
// defined by whether the torrent file has been committed to the cache directory.
120
func (t *Torrent) Complete() bool {
121
	return t.committed.Load()
122
}
123

124
// BytesDownloaded returns an estimate of the number of bytes downloaded in the
125
// torrent.
126
func (t *Torrent) BytesDownloaded() int64 {
127
	return min(int64(t.numComplete.Load())*t.metaInfo.PieceLength(), t.metaInfo.Length())
128
}
129

130
// Bitfield returns the bitfield of pieces where true denotes a complete piece
131
// and false denotes an incomplete piece.
132
func (t *Torrent) Bitfield() *bitset.BitSet {
133
	bitfield := bitset.New(uint(len(t.pieces)))
134
	for i, p := range t.pieces {
135
		if p.complete() {
136
			bitfield.Set(uint(i))
137
		}
138
	}
139
	return bitfield
140
}
141

142
func (t *Torrent) String() string {
143
	downloaded := int(float64(t.BytesDownloaded()) / float64(t.metaInfo.Length()) * 100)
144
	return fmt.Sprintf(
145
		"torrent(name=%s, hash=%s, downloaded=%d%%)",
146
		t.Digest().Hex(), t.InfoHash().Hex(), downloaded)
147
}
148

149
func (t *Torrent) getPiece(pi int) (*piece, error) {
150
	if pi >= len(t.pieces) {
151
		return nil, fmt.Errorf("invalid piece index %d: num pieces = %d", pi, len(t.pieces))
152
	}
153
	return t.pieces[pi], nil
154
}
155

156
// markPieceComplete must only be called once per piece.
157
func (t *Torrent) markPieceComplete(pi int) error {
158
	updated, err := t.cads.Download().SetMetadataAt(
159
		t.Digest().Hex(), &pieceStatusMetadata{}, []byte{byte(_complete)}, int64(pi))
160
	if err != nil {
161
		return fmt.Errorf("write piece metadata: %s", err)
162
	}
163
	if !updated {
164
		// This could mean there's another thread with a Torrent instance using
165
		// the same file as us.
166
		log.Errorf(
167
			"Invariant violation: piece marked complete twice: piece %d in %s", pi, t.Digest().Hex())
168
	}
169
	t.pieces[pi].markComplete()
170
	t.numComplete.Inc()
171
	return nil
172
}
173

174
// writePiece writes data to piece pi. If the write succeeds, marks the piece as completed.
175
func (t *Torrent) writePiece(src storage.PieceReader, pi int) error {
176
	f, err := t.cads.GetDownloadFileReadWriter(t.metaInfo.Digest().Hex())
177
	if err != nil {
178
		return fmt.Errorf("get download writer: %s", err)
179
	}
180
	defer f.Close()
181

182
	h := core.PieceHash()
183
	r := io.TeeReader(src, h) // Calculates piece sum as we write to file.
184

185
	if _, err := f.Seek(t.getFileOffset(pi), 0); err != nil {
186
		return fmt.Errorf("seek: %s", err)
187
	}
188
	if _, err := io.Copy(f, r); err != nil {
189
		return fmt.Errorf("copy: %s", err)
190
	}
191
	if h.Sum32() != t.metaInfo.GetPieceSum(pi) {
192
		return errors.New("invalid piece sum")
193
	}
194

195
	if err := t.markPieceComplete(pi); err != nil {
196
		return fmt.Errorf("mark piece complete: %s", err)
197
	}
198
	return nil
199
}
200

201
// WritePiece writes data to piece pi.
202
func (t *Torrent) WritePiece(src storage.PieceReader, pi int) error {
203
	piece, err := t.getPiece(pi)
204
	if err != nil {
205
		return err
206
	}
207
	if int64(src.Length()) != t.PieceLength(pi) {
208
		return fmt.Errorf(
209
			"invalid piece length: expected %d, got %d", t.PieceLength(pi), src.Length())
210
	}
211

212
	// Exit quickly if the piece is not writable.
213
	if piece.complete() {
214
		return storage.ErrPieceComplete
215
	}
216
	if piece.dirty() {
217
		return errWritePieceConflict
218
	}
219

220
	dirty, complete := piece.tryMarkDirty()
221
	if dirty {
222
		return errWritePieceConflict
223
	} else if complete {
224
		return storage.ErrPieceComplete
225
	}
226

227
	// At this point, we've determined that the piece is not complete and ensured
228
	// we are the only thread which may write the piece. We do not block other
229
	// threads from checking if the piece is writable.
230

231
	if err := t.writePiece(src, pi); err != nil {
232
		// Allow other threads to write this piece since we mysteriously failed.
233
		piece.markEmpty()
234
		return fmt.Errorf("write piece: %s", err)
235
	}
236

237
	if int(t.numComplete.Load()) == len(t.pieces) {
238
		// Multiple threads may attempt to move the download file to cache, however
239
		// only one will succeed while the others will receive (and ignore) file exist
240
		// error.
241
		err := t.cads.MoveDownloadFileToCache(t.metaInfo.Digest().Hex())
242
		if err != nil && !os.IsExist(err) {
243
			return fmt.Errorf("download completed but failed to move file to cache directory: %s", err)
244
		}
245
		t.committed.Store(true)
246
	}
247

248
	return nil
249
}
250

251
type opener struct {
252
	torrent *Torrent
253
}
254

255
func (o *opener) Open() (store.FileReader, error) {
256
	return o.torrent.cads.Any().GetFileReader(o.torrent.Digest().Hex())
257
}
258

259
// GetPieceReader returns a reader for piece pi.
260
func (t *Torrent) GetPieceReader(pi int) (storage.PieceReader, error) {
261
	piece, err := t.getPiece(pi)
262
	if err != nil {
263
		return nil, err
264
	}
265
	if !piece.complete() {
266
		return nil, errPieceNotComplete
267
	}
268
	return piecereader.NewFileReader(t.getFileOffset(pi), t.PieceLength(pi), &opener{t}), nil
269
}
270

271
// HasPiece returns if piece pi is complete.
272
func (t *Torrent) HasPiece(pi int) bool {
273
	piece, err := t.getPiece(pi)
274
	if err != nil {
275
		return false
276
	}
277
	return piece.complete()
278
}
279

280
// MissingPieces returns the indeces of all missing pieces.
281
func (t *Torrent) MissingPieces() []int {
282
	var missing []int
283
	for i, p := range t.pieces {
284
		if !p.complete() {
285
			missing = append(missing, i)
286
		}
287
	}
288
	return missing
289
}
290

291
// getFileOffset calculates the offset in the torrent file given piece index.
292
// Assumes pi is a valid piece index.
293
func (t *Torrent) getFileOffset(pi int) int64 {
294
	return t.metaInfo.PieceLength() * int64(pi)
295
}
296

297
func min(a, b int64) int64 {
298
	if a < b {
299
		return a
300
	}
301
	return b
302
}
303

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.