kraken
159 строк · 3.7 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package agentstorage15
16import (17"fmt"18"regexp"19"sync"20
21"github.com/uber/kraken/core"22"github.com/uber/kraken/lib/store/metadata"23"github.com/uber/kraken/utils/log"24)
25
26const _pieceStatusSuffix = "_status"27
28func init() {29metadata.Register(regexp.MustCompile(_pieceStatusSuffix), pieceStatusMetadataFactory{})30}
31
32type pieceStatus int33
34const (35_empty pieceStatus = iota36_complete
37_dirty
38)
39
40type pieceStatusMetadataFactory struct{}41
42func (m pieceStatusMetadataFactory) Create(suffix string) metadata.Metadata {43return &pieceStatusMetadata{}44}
45
46// pieceStatusMetadata stores pieces statuses as metadata on disk.
47type pieceStatusMetadata struct {48pieces []*piece49}
50
51func newPieceStatusMetadata(pieces []*piece) *pieceStatusMetadata {52return &pieceStatusMetadata{pieces}53}
54
55func (m *pieceStatusMetadata) GetSuffix() string {56return _pieceStatusSuffix57}
58
59func (m *pieceStatusMetadata) Movable() bool {60return true61}
62
63func (m *pieceStatusMetadata) Serialize() ([]byte, error) {64b := make([]byte, len(m.pieces))65for i, p := range m.pieces {66b[i] = byte(p.status)67}68return b, nil69}
70
71func (m *pieceStatusMetadata) Deserialize(b []byte) error {72m.pieces = make([]*piece, len(b))73for i := range b {74status := pieceStatus(b[i])75if status != _empty && status != _complete {76log.Errorf("Unexpected status in piece metadata: %d", status)77status = _empty78}79m.pieces[i] = &piece{status: status}80}81return nil82}
83
84type piece struct {85sync.RWMutex86status pieceStatus
87}
88
89func (p *piece) complete() bool {90p.RLock()91defer p.RUnlock()92return p.status == _complete93}
94
95func (p *piece) dirty() bool {96p.RLock()97defer p.RUnlock()98return p.status == _dirty99}
100
101func (p *piece) tryMarkDirty() (dirty, complete bool) {102p.Lock()103defer p.Unlock()104
105switch p.status {106case _empty:107p.status = _dirty108case _dirty:109dirty = true110case _complete:111complete = true112default:113log.Fatalf("Unknown piece status: %d", p.status)114}115return116}
117
118func (p *piece) markEmpty() {119p.Lock()120defer p.Unlock()121p.status = _empty122}
123
124func (p *piece) markComplete() {125p.Lock()126defer p.Unlock()127p.status = _complete128}
129
130// restorePieces reads piece metadata from disk and restores the in-memory piece
131// statuses. A naive solution would be to read the entire blob from disk and
132// hash the pieces to determine completion status -- however, this is very
133// expensive. Instead, Torrent tracks completed pieces on disk via metadata
134// as they are written.
135func restorePieces(136d core.Digest,137cads caDownloadStore,138numPieces int) (pieces []*piece, numComplete int, err error) {139
140for i := 0; i < numPieces; i++ {141pieces = append(pieces, &piece{status: _empty})142}143md := newPieceStatusMetadata(pieces)144if err := cads.Download().GetOrSetMetadata(d.Hex(), md); cads.InCacheError(err) {145// File is in cache state -- initialize completed pieces.146for _, p := range pieces {147p.status = _complete148}149return pieces, numPieces, nil150} else if err != nil {151return nil, 0, fmt.Errorf("get or set piece metadata: %s", err)152}153for _, p := range md.pieces {154if p.status == _complete {155numComplete++156}157}158return md.pieces, numComplete, nil159}
160