kraken
130 строк · 4.4 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package agentstorage15
16import (17"fmt"18"os"19
20"github.com/uber-go/tally"21"github.com/willf/bitset"22
23"github.com/uber/kraken/core"24"github.com/uber/kraken/lib/store"25"github.com/uber/kraken/lib/store/metadata"26"github.com/uber/kraken/lib/torrent/storage"27"github.com/uber/kraken/tracker/metainfoclient"28)
29
30// TorrentArchive is capable of initializing torrents in the download directory
31// and serving torrents from either the download or cache directory.
32type TorrentArchive struct {33stats tally.Scope34cads *store.CADownloadStore35metaInfoClient metainfoclient.Client36}
37
38// NewTorrentArchive creates a new TorrentArchive.
39func NewTorrentArchive(40stats tally.Scope,41cads *store.CADownloadStore,42mic metainfoclient.Client) *TorrentArchive {43
44stats = stats.Tagged(map[string]string{45"module": "agenttorrentarchive",46})47
48return &TorrentArchive{stats, cads, mic}49}
50
51// Stat returns TorrentInfo for the given digest. Returns os.ErrNotExist if the
52// file does not exist. Ignores namespace.
53func (a *TorrentArchive) Stat(namespace string, d core.Digest) (*storage.TorrentInfo, error) {54var tm metadata.TorrentMeta55if err := a.cads.Any().GetMetadata(d.Hex(), &tm); err != nil {56return nil, err57}58var psm pieceStatusMetadata59if err := a.cads.Any().GetMetadata(d.Hex(), &psm); err != nil {60return nil, err61}62b := bitset.New(uint(len(psm.pieces)))63for i, p := range psm.pieces {64if p.status == _complete {65b.Set(uint(i))66}67}68return storage.NewTorrentInfo(tm.MetaInfo, b), nil69}
70
71// CreateTorrent returns a Torrent for either an existing metainfo / file on
72// disk, or downloads metainfo and initializes the file. Returns ErrNotFound
73// if no metainfo was found.
74func (a *TorrentArchive) CreateTorrent(namespace string, d core.Digest) (storage.Torrent, error) {75var tm metadata.TorrentMeta76if err := a.cads.Any().GetMetadata(d.Hex(), &tm); os.IsNotExist(err) {77downloadTimer := a.stats.Timer("metainfo_download").Start()78mi, err := a.metaInfoClient.Download(namespace, d)79if err != nil {80if err == metainfoclient.ErrNotFound {81return nil, storage.ErrNotFound82}83return nil, fmt.Errorf("download metainfo: %s", err)84}85downloadTimer.Stop()86
87// There's a race condition here, but it's "okay"... Basically, we could88// initialize a download file with metainfo that is rejected by file store,89// because someone else beats us to it. However, we catch a lucky break90// because the only piece of metainfo we use is file length -- which digest91// is derived from, so it's "okay".92createErr := a.cads.CreateDownloadFile(mi.Digest().Hex(), mi.Length())93if createErr != nil &&94!(a.cads.InDownloadError(createErr) || a.cads.InCacheError(createErr)) {95return nil, fmt.Errorf("create download file: %s", createErr)96}97tm.MetaInfo = mi98if err := a.cads.Any().GetOrSetMetadata(d.Hex(), &tm); err != nil {99return nil, fmt.Errorf("get or set metainfo: %s", err)100}101} else if err != nil {102return nil, fmt.Errorf("get metainfo: %s", err)103}104t, err := NewTorrent(a.cads, tm.MetaInfo)105if err != nil {106return nil, fmt.Errorf("initialize torrent: %s", err)107}108return t, nil109}
110
111// GetTorrent returns a Torrent for an existing metainfo / file on disk. Ignores namespace.
112func (a *TorrentArchive) GetTorrent(namespace string, d core.Digest) (storage.Torrent, error) {113var tm metadata.TorrentMeta114if err := a.cads.Any().GetMetadata(d.Hex(), &tm); err != nil {115return nil, fmt.Errorf("get metainfo: %s", err)116}117t, err := NewTorrent(a.cads, tm.MetaInfo)118if err != nil {119return nil, fmt.Errorf("initialize torrent: %s", err)120}121return t, nil122}
123
124// DeleteTorrent deletes a torrent from disk.
125func (a *TorrentArchive) DeleteTorrent(d core.Digest) error {126if err := a.cads.Any().DeleteFile(d.Hex()); err != nil && !os.IsNotExist(err) {127return err128}129return nil130}
131