kraken
302 строки · 8.6 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package agentstorage15
16import (17"errors"18"fmt"19"io"20"os"21
22"github.com/uber/kraken/core"23"github.com/uber/kraken/lib/store"24"github.com/uber/kraken/lib/torrent/storage"25"github.com/uber/kraken/lib/torrent/storage/piecereader"26"github.com/uber/kraken/utils/log"27
28"github.com/willf/bitset"29"go.uber.org/atomic"30)
31
32var (33errPieceNotComplete = errors.New("piece not complete")34errWritePieceConflict = errors.New("piece is already being written to")35)
36
37// caDownloadStore defines the CADownloadStore methods which Torrent requires. Useful
38// for testing purposes, where we need to mock certain methods.
39type caDownloadStore interface {40MoveDownloadFileToCache(name string) error41GetDownloadFileReadWriter(name string) (store.FileReadWriter, error)42Any() *store.CADownloadStoreScope43Download() *store.CADownloadStoreScope44InCacheError(error) bool45}
46
47// Torrent implements a Torrent on top of an AgentFileStore.
48// It Allows concurrent writes on distinct pieces, and concurrent reads on all
49// pieces. Behavior is undefined if multiple Torrent instances are backed
50// by the same file store and metainfo.
51type Torrent struct {52metaInfo *core.MetaInfo53cads caDownloadStore
54pieces []*piece55numComplete *atomic.Int3256committed *atomic.Bool57}
58
59// NewTorrent creates a new Torrent.
60func NewTorrent(cads caDownloadStore, mi *core.MetaInfo) (*Torrent, error) {61pieces, numComplete, err := restorePieces(mi.Digest(), cads, mi.NumPieces())62if err != nil {63return nil, fmt.Errorf("restore pieces: %s", err)64}65
66committed := false67if numComplete == len(pieces) {68if err := cads.MoveDownloadFileToCache(mi.Digest().Hex()); err != nil && !os.IsExist(err) {69return nil, fmt.Errorf("move file to cache: %s", err)70}71committed = true72}73
74return &Torrent{75cads: cads,76metaInfo: mi,77pieces: pieces,78numComplete: atomic.NewInt32(int32(numComplete)),79committed: atomic.NewBool(committed),80}, nil81}
82
83// Digest returns the digest of the target blob.
84func (t *Torrent) Digest() core.Digest {85return t.metaInfo.Digest()86}
87
88// Stat returns the storage.TorrentInfo for t.
89func (t *Torrent) Stat() *storage.TorrentInfo {90return storage.NewTorrentInfo(t.metaInfo, t.Bitfield())91}
92
93// InfoHash returns the torrent metainfo hash.
94func (t *Torrent) InfoHash() core.InfoHash {95return t.metaInfo.InfoHash()96}
97
98// NumPieces returns the number of pieces in the torrent.
99func (t *Torrent) NumPieces() int {100return len(t.pieces)101}
102
103// Length returns the length of the target file.
104func (t *Torrent) Length() int64 {105return t.metaInfo.Length()106}
107
108// PieceLength returns the length of piece pi.
109func (t *Torrent) PieceLength(pi int) int64 {110return t.metaInfo.GetPieceLength(pi)111}
112
113// MaxPieceLength returns the longest piece length of the torrent.
114func (t *Torrent) MaxPieceLength() int64 {115return t.PieceLength(0)116}
117
118// Complete indicates whether the torrent is complete or not. Completeness is
119// defined by whether the torrent file has been committed to the cache directory.
120func (t *Torrent) Complete() bool {121return t.committed.Load()122}
123
124// BytesDownloaded returns an estimate of the number of bytes downloaded in the
125// torrent.
126func (t *Torrent) BytesDownloaded() int64 {127return min(int64(t.numComplete.Load())*t.metaInfo.PieceLength(), t.metaInfo.Length())128}
129
130// Bitfield returns the bitfield of pieces where true denotes a complete piece
131// and false denotes an incomplete piece.
132func (t *Torrent) Bitfield() *bitset.BitSet {133bitfield := bitset.New(uint(len(t.pieces)))134for i, p := range t.pieces {135if p.complete() {136bitfield.Set(uint(i))137}138}139return bitfield140}
141
142func (t *Torrent) String() string {143downloaded := int(float64(t.BytesDownloaded()) / float64(t.metaInfo.Length()) * 100)144return fmt.Sprintf(145"torrent(name=%s, hash=%s, downloaded=%d%%)",146t.Digest().Hex(), t.InfoHash().Hex(), downloaded)147}
148
149func (t *Torrent) getPiece(pi int) (*piece, error) {150if pi >= len(t.pieces) {151return nil, fmt.Errorf("invalid piece index %d: num pieces = %d", pi, len(t.pieces))152}153return t.pieces[pi], nil154}
155
156// markPieceComplete must only be called once per piece.
157func (t *Torrent) markPieceComplete(pi int) error {158updated, err := t.cads.Download().SetMetadataAt(159t.Digest().Hex(), &pieceStatusMetadata{}, []byte{byte(_complete)}, int64(pi))160if err != nil {161return fmt.Errorf("write piece metadata: %s", err)162}163if !updated {164// This could mean there's another thread with a Torrent instance using165// the same file as us.166log.Errorf(167"Invariant violation: piece marked complete twice: piece %d in %s", pi, t.Digest().Hex())168}169t.pieces[pi].markComplete()170t.numComplete.Inc()171return nil172}
173
174// writePiece writes data to piece pi. If the write succeeds, marks the piece as completed.
175func (t *Torrent) writePiece(src storage.PieceReader, pi int) error {176f, err := t.cads.GetDownloadFileReadWriter(t.metaInfo.Digest().Hex())177if err != nil {178return fmt.Errorf("get download writer: %s", err)179}180defer f.Close()181
182h := core.PieceHash()183r := io.TeeReader(src, h) // Calculates piece sum as we write to file.184
185if _, err := f.Seek(t.getFileOffset(pi), 0); err != nil {186return fmt.Errorf("seek: %s", err)187}188if _, err := io.Copy(f, r); err != nil {189return fmt.Errorf("copy: %s", err)190}191if h.Sum32() != t.metaInfo.GetPieceSum(pi) {192return errors.New("invalid piece sum")193}194
195if err := t.markPieceComplete(pi); err != nil {196return fmt.Errorf("mark piece complete: %s", err)197}198return nil199}
200
201// WritePiece writes data to piece pi.
202func (t *Torrent) WritePiece(src storage.PieceReader, pi int) error {203piece, err := t.getPiece(pi)204if err != nil {205return err206}207if int64(src.Length()) != t.PieceLength(pi) {208return fmt.Errorf(209"invalid piece length: expected %d, got %d", t.PieceLength(pi), src.Length())210}211
212// Exit quickly if the piece is not writable.213if piece.complete() {214return storage.ErrPieceComplete215}216if piece.dirty() {217return errWritePieceConflict218}219
220dirty, complete := piece.tryMarkDirty()221if dirty {222return errWritePieceConflict223} else if complete {224return storage.ErrPieceComplete225}226
227// At this point, we've determined that the piece is not complete and ensured228// we are the only thread which may write the piece. We do not block other229// threads from checking if the piece is writable.230
231if err := t.writePiece(src, pi); err != nil {232// Allow other threads to write this piece since we mysteriously failed.233piece.markEmpty()234return fmt.Errorf("write piece: %s", err)235}236
237if int(t.numComplete.Load()) == len(t.pieces) {238// Multiple threads may attempt to move the download file to cache, however239// only one will succeed while the others will receive (and ignore) file exist240// error.241err := t.cads.MoveDownloadFileToCache(t.metaInfo.Digest().Hex())242if err != nil && !os.IsExist(err) {243return fmt.Errorf("download completed but failed to move file to cache directory: %s", err)244}245t.committed.Store(true)246}247
248return nil249}
250
251type opener struct {252torrent *Torrent253}
254
255func (o *opener) Open() (store.FileReader, error) {256return o.torrent.cads.Any().GetFileReader(o.torrent.Digest().Hex())257}
258
259// GetPieceReader returns a reader for piece pi.
260func (t *Torrent) GetPieceReader(pi int) (storage.PieceReader, error) {261piece, err := t.getPiece(pi)262if err != nil {263return nil, err264}265if !piece.complete() {266return nil, errPieceNotComplete267}268return piecereader.NewFileReader(t.getFileOffset(pi), t.PieceLength(pi), &opener{t}), nil269}
270
271// HasPiece returns if piece pi is complete.
272func (t *Torrent) HasPiece(pi int) bool {273piece, err := t.getPiece(pi)274if err != nil {275return false276}277return piece.complete()278}
279
280// MissingPieces returns the indeces of all missing pieces.
281func (t *Torrent) MissingPieces() []int {282var missing []int283for i, p := range t.pieces {284if !p.complete() {285missing = append(missing, i)286}287}288return missing289}
290
291// getFileOffset calculates the offset in the torrent file given piece index.
292// Assumes pi is a valid piece index.
293func (t *Torrent) getFileOffset(pi int) int64 {294return t.metaInfo.PieceLength() * int64(pi)295}
296
297func min(a, b int64) int64 {298if a < b {299return a300}301return b302}
303