kraken
750 строк · 22.5 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package blobserver
15
16import (
17"encoding/json"
18"fmt"
19"io"
20"net/http"
21_ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux.
22"os"
23"strconv"
24"strings"
25"sync"
26"time"
27
28"github.com/uber/kraken/core"
29"github.com/uber/kraken/lib/backend"
30"github.com/uber/kraken/lib/backend/backenderrors"
31"github.com/uber/kraken/lib/blobrefresh"
32"github.com/uber/kraken/lib/hashring"
33"github.com/uber/kraken/lib/metainfogen"
34"github.com/uber/kraken/lib/middleware"
35"github.com/uber/kraken/lib/persistedretry"
36"github.com/uber/kraken/lib/persistedretry/writeback"
37"github.com/uber/kraken/lib/store"
38"github.com/uber/kraken/lib/store/metadata"
39"github.com/uber/kraken/origin/blobclient"
40"github.com/uber/kraken/utils/errutil"
41"github.com/uber/kraken/utils/handler"
42"github.com/uber/kraken/utils/httputil"
43"github.com/uber/kraken/utils/listener"
44"github.com/uber/kraken/utils/log"
45"github.com/uber/kraken/utils/memsize"
46"github.com/uber/kraken/utils/stringset"
47
48"github.com/andres-erbsen/clock"
49"github.com/go-chi/chi"
50"github.com/uber-go/tally"
51)
52
53const _uploadChunkSize = 16 * memsize.MB
54
55// Server defines a server that serves blob data for agent.
56type Server struct {
57config Config
58stats tally.Scope
59clk clock.Clock
60addr string
61hashRing hashring.Ring
62cas *store.CAStore
63clientProvider blobclient.Provider
64clusterProvider blobclient.ClusterProvider
65backends *backend.Manager
66blobRefresher *blobrefresh.Refresher
67metaInfoGenerator *metainfogen.Generator
68uploader *uploader
69writeBackManager persistedretry.Manager
70
71// This is an unfortunate coupling between the p2p client and the blob server.
72// Tracker queries the origin cluster to discover which origins can seed
73// a given torrent, however this requires blob server to understand the
74// context of the p2p client running alongside it.
75pctx core.PeerContext
76}
77
78// New initializes a new Server.
79func New(
80config Config,
81stats tally.Scope,
82clk clock.Clock,
83addr string,
84hashRing hashring.Ring,
85cas *store.CAStore,
86clientProvider blobclient.Provider,
87clusterProvider blobclient.ClusterProvider,
88pctx core.PeerContext,
89backends *backend.Manager,
90blobRefresher *blobrefresh.Refresher,
91metaInfoGenerator *metainfogen.Generator,
92writeBackManager persistedretry.Manager) (*Server, error) {
93
94config = config.applyDefaults()
95
96stats = stats.Tagged(map[string]string{
97"module": "blobserver",
98})
99
100return &Server{
101config: config,
102stats: stats,
103clk: clk,
104addr: addr,
105hashRing: hashRing,
106cas: cas,
107clientProvider: clientProvider,
108clusterProvider: clusterProvider,
109backends: backends,
110blobRefresher: blobRefresher,
111metaInfoGenerator: metaInfoGenerator,
112uploader: newUploader(cas),
113writeBackManager: writeBackManager,
114pctx: pctx,
115}, nil
116}
117
118// Addr returns the address the blob server is configured on.
119func (s *Server) Addr() string {
120return s.addr
121}
122
123// Handler returns an http handler for the blob server.
124func (s *Server) Handler() http.Handler {
125r := chi.NewRouter()
126
127r.Use(middleware.StatusCounter(s.stats))
128r.Use(middleware.LatencyTimer(s.stats))
129
130// Public endpoints:
131
132r.Get("/health", handler.Wrap(s.healthCheckHandler))
133
134r.Get("/blobs/{digest}/locations", handler.Wrap(s.getLocationsHandler))
135
136r.Post("/namespace/{namespace}/blobs/{digest}/uploads", handler.Wrap(s.startClusterUploadHandler))
137r.Patch("/namespace/{namespace}/blobs/{digest}/uploads/{uid}", handler.Wrap(s.patchClusterUploadHandler))
138r.Put("/namespace/{namespace}/blobs/{digest}/uploads/{uid}", handler.Wrap(s.commitClusterUploadHandler))
139
140r.Get("/namespace/{namespace}/blobs/{digest}", handler.Wrap(s.downloadBlobHandler))
141
142r.Post("/namespace/{namespace}/blobs/{digest}/remote/{remote}", handler.Wrap(s.replicateToRemoteHandler))
143
144r.Post("/forcecleanup", handler.Wrap(s.forceCleanupHandler))
145
146// Internal endpoints:
147
148r.Post("/internal/blobs/{digest}/uploads", handler.Wrap(s.startTransferHandler))
149r.Patch("/internal/blobs/{digest}/uploads/{uid}", handler.Wrap(s.patchTransferHandler))
150r.Put("/internal/blobs/{digest}/uploads/{uid}", handler.Wrap(s.commitTransferHandler))
151
152r.Delete("/internal/blobs/{digest}", handler.Wrap(s.deleteBlobHandler))
153
154r.Post("/internal/blobs/{digest}/metainfo", handler.Wrap(s.overwriteMetaInfoHandler))
155
156r.Get("/internal/peercontext", handler.Wrap(s.getPeerContextHandler))
157
158r.Head("/internal/namespace/{namespace}/blobs/{digest}", handler.Wrap(s.statHandler))
159
160r.Get("/internal/namespace/{namespace}/blobs/{digest}/metainfo", handler.Wrap(s.getMetaInfoHandler))
161
162r.Put(
163"/internal/duplicate/namespace/{namespace}/blobs/{digest}/uploads/{uid}",
164handler.Wrap(s.duplicateCommitClusterUploadHandler))
165
166r.Mount("/", http.DefaultServeMux) // Serves /debug/pprof endpoints.
167
168return r
169}
170
171// ListenAndServe is a blocking call which runs s.
172func (s *Server) ListenAndServe(h http.Handler) error {
173log.Infof("Starting blob server on %s", s.config.Listener)
174return listener.Serve(s.config.Listener, h)
175}
176
177func (s *Server) healthCheckHandler(w http.ResponseWriter, r *http.Request) error {
178fmt.Fprintln(w, "OK")
179return nil
180}
181
182// statHandler returns blob info if it exists.
183func (s *Server) statHandler(w http.ResponseWriter, r *http.Request) error {
184checkLocal, err := strconv.ParseBool(httputil.GetQueryArg(r, "local", "false"))
185if err != nil {
186return handler.Errorf("parse arg `local` as bool: %s", err)
187}
188namespace, err := httputil.ParseParam(r, "namespace")
189if err != nil {
190return err
191}
192d, err := httputil.ParseDigest(r, "digest")
193if err != nil {
194return err
195}
196
197bi, err := s.stat(namespace, d, checkLocal)
198if os.IsNotExist(err) {
199return handler.ErrorStatus(http.StatusNotFound)
200} else if err != nil {
201return fmt.Errorf("stat: %s", err)
202}
203w.Header().Set("Content-Length", strconv.FormatInt(bi.Size, 10))
204log.Debugf("successfully check blob %s exists", d.Hex())
205return nil
206}
207
208func (s *Server) stat(namespace string, d core.Digest, checkLocal bool) (*core.BlobInfo, error) {
209fi, err := s.cas.GetCacheFileStat(d.Hex())
210if err == nil {
211return core.NewBlobInfo(fi.Size()), nil
212} else if os.IsNotExist(err) {
213if !checkLocal {
214client, err := s.backends.GetClient(namespace)
215if err != nil {
216return nil, fmt.Errorf("get backend client: %s", err)
217}
218if bi, err := client.Stat(namespace, d.Hex()); err == nil {
219return bi, nil
220} else if err == backenderrors.ErrBlobNotFound {
221return nil, os.ErrNotExist
222} else {
223return nil, fmt.Errorf("backend stat: %s", err)
224}
225}
226return nil, err // os.ErrNotExist
227}
228
229return nil, fmt.Errorf("stat cache file: %s", err)
230}
231
232func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) error {
233namespace, err := httputil.ParseParam(r, "namespace")
234if err != nil {
235return err
236}
237d, err := httputil.ParseDigest(r, "digest")
238if err != nil {
239return err
240}
241if err := s.downloadBlob(namespace, d, w); err != nil {
242return err
243}
244setOctetStreamContentType(w)
245return nil
246}
247
248func (s *Server) replicateToRemoteHandler(w http.ResponseWriter, r *http.Request) error {
249namespace, err := httputil.ParseParam(r, "namespace")
250if err != nil {
251return err
252}
253d, err := httputil.ParseDigest(r, "digest")
254if err != nil {
255return err
256}
257remote, err := httputil.ParseParam(r, "remote")
258if err != nil {
259return err
260}
261return s.replicateToRemote(namespace, d, remote)
262}
263
264func (s *Server) replicateToRemote(namespace string, d core.Digest, remoteDNS string) error {
265f, err := s.cas.GetCacheFileReader(d.Hex())
266if err != nil {
267if os.IsNotExist(err) {
268return s.startRemoteBlobDownload(namespace, d, false)
269}
270return handler.Errorf("file store: %s", err)
271}
272defer f.Close()
273
274remote, err := s.clusterProvider.Provide(remoteDNS)
275if err != nil {
276return handler.Errorf("remote cluster provider: %s", err)
277}
278return remote.UploadBlob(namespace, d, f)
279}
280
281// deleteBlobHandler deletes blob data.
282func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error {
283d, err := httputil.ParseDigest(r, "digest")
284if err != nil {
285return err
286}
287if err := s.deleteBlob(d); err != nil {
288return err
289}
290setContentLength(w, 0)
291w.WriteHeader(http.StatusAccepted)
292log.Debugf("successfully delete blob %s", d.Hex())
293return nil
294}
295
296func (s *Server) getLocationsHandler(w http.ResponseWriter, r *http.Request) error {
297d, err := httputil.ParseDigest(r, "digest")
298if err != nil {
299return err
300}
301locs := s.hashRing.Locations(d)
302w.Header().Set("Origin-Locations", strings.Join(locs, ","))
303w.WriteHeader(http.StatusOK)
304return nil
305}
306
307// getPeerContextHandler returns the Server's peer context as JSON.
308func (s *Server) getPeerContextHandler(w http.ResponseWriter, r *http.Request) error {
309if err := json.NewEncoder(w).Encode(s.pctx); err != nil {
310return handler.Errorf("error converting peer context to json: %s", err)
311}
312return nil
313}
314
315func (s *Server) getMetaInfoHandler(w http.ResponseWriter, r *http.Request) error {
316namespace, err := httputil.ParseParam(r, "namespace")
317if err != nil {
318return err
319}
320d, err := httputil.ParseDigest(r, "digest")
321if err != nil {
322return err
323}
324raw, err := s.getMetaInfo(namespace, d)
325if err != nil {
326return err
327}
328w.Write(raw)
329return nil
330}
331
332func (s *Server) overwriteMetaInfoHandler(w http.ResponseWriter, r *http.Request) error {
333d, err := httputil.ParseDigest(r, "digest")
334if err != nil {
335return err
336}
337pieceLength, err := strconv.ParseInt(r.URL.Query().Get("piece_length"), 10, 64)
338if err != nil {
339return handler.Errorf("invalid piece_length argument: %s", err).Status(http.StatusBadRequest)
340}
341return s.overwriteMetaInfo(d, pieceLength)
342}
343
344// overwriteMetaInfo generates metainfo configured with pieceLength for d and
345// writes it to disk, overwriting any existing metainfo. Primarily intended for
346// benchmarking purposes.
347func (s *Server) overwriteMetaInfo(d core.Digest, pieceLength int64) error {
348f, err := s.cas.GetCacheFileReader(d.Hex())
349if err != nil {
350return handler.Errorf("get cache file: %s", err)
351}
352mi, err := core.NewMetaInfo(d, f, pieceLength)
353if err != nil {
354return handler.Errorf("create metainfo: %s", err)
355}
356if _, err := s.cas.SetCacheFileMetadata(d.Hex(), metadata.NewTorrentMeta(mi)); err != nil {
357return handler.Errorf("set metainfo: %s", err)
358}
359return nil
360}
361
362// getMetaInfo returns metainfo for d. If no blob exists under d, a download of
363// the blob from the storage backend configured for namespace will be initiated.
364// This download is asynchronous and getMetaInfo will immediately return a
365// "202 Accepted" server error.
366func (s *Server) getMetaInfo(namespace string, d core.Digest) ([]byte, error) {
367var tm metadata.TorrentMeta
368if err := s.cas.GetCacheFileMetadata(d.Hex(), &tm); os.IsNotExist(err) {
369return nil, s.startRemoteBlobDownload(namespace, d, true)
370} else if err != nil {
371return nil, handler.Errorf("get cache metadata: %s", err)
372}
373return tm.Serialize()
374}
375
376type localReplicationHook struct {
377server *Server
378}
379
380func (h *localReplicationHook) Run(d core.Digest) {
381timer := h.server.stats.Timer("replicate_blob").Start()
382if err := h.server.replicateBlobLocally(d); err != nil {
383// Don't return error here as we only want to cache storage backend errors.
384log.With("blob", d.Hex()).Errorf("Error replicating remote blob: %s", err)
385h.server.stats.Counter("replicate_blob_errors").Inc(1)
386return
387}
388timer.Stop()
389}
390
391func (s *Server) startRemoteBlobDownload(
392namespace string, d core.Digest, replicateLocally bool) error {
393
394var hooks []blobrefresh.PostHook
395if replicateLocally {
396hooks = append(hooks, &localReplicationHook{s})
397}
398err := s.blobRefresher.Refresh(namespace, d, hooks...)
399switch err {
400case blobrefresh.ErrPending, nil:
401return handler.ErrorStatus(http.StatusAccepted)
402case blobrefresh.ErrNotFound:
403return handler.ErrorStatus(http.StatusNotFound)
404case blobrefresh.ErrWorkersBusy:
405return handler.ErrorStatus(http.StatusServiceUnavailable)
406default:
407return err
408}
409}
410
411func (s *Server) replicateBlobLocally(d core.Digest) error {
412return s.applyToReplicas(d, func(i int, client blobclient.Client) error {
413f, err := s.cas.GetCacheFileReader(d.Hex())
414if err != nil {
415return fmt.Errorf("get cache reader: %s", err)
416}
417if err := client.TransferBlob(d, f); err != nil {
418return fmt.Errorf("transfer blob: %s", err)
419}
420return nil
421})
422}
423
424// applyToReplicas applies f to the replicas of d concurrently in random order,
425// not including the current origin. Passes the index of the iteration to f.
426func (s *Server) applyToReplicas(d core.Digest, f func(i int, c blobclient.Client) error) error {
427replicas := stringset.FromSlice(s.hashRing.Locations(d))
428replicas.Remove(s.addr)
429
430var mu sync.Mutex
431var errs []error
432
433var wg sync.WaitGroup
434var i int
435for replica := range replicas {
436wg.Add(1)
437go func(i int, replica string) {
438defer wg.Done()
439if err := f(i, s.clientProvider.Provide(replica)); err != nil {
440mu.Lock()
441errs = append(errs, err)
442mu.Unlock()
443}
444}(i, replica)
445i++
446}
447wg.Wait()
448
449return errutil.Join(errs)
450}
451
452// downloadBlob downloads blob for d into dst. If no blob exists under d, a
453// download of the blob from the storage backend configured for namespace will
454// be initiated. This download is asynchronous and downloadBlob will immediately
455// return a "202 Accepted" handler error.
456func (s *Server) downloadBlob(namespace string, d core.Digest, dst io.Writer) error {
457f, err := s.cas.GetCacheFileReader(d.Hex())
458if os.IsNotExist(err) {
459return s.startRemoteBlobDownload(namespace, d, true)
460} else if err != nil {
461return handler.Errorf("get cache file: %s", err)
462}
463defer f.Close()
464
465if _, err := io.Copy(dst, f); err != nil {
466return handler.Errorf("copy blob: %s", err)
467}
468return nil
469}
470
471func (s *Server) deleteBlob(d core.Digest) error {
472if err := s.cas.DeleteCacheFile(d.Hex()); err != nil {
473if os.IsNotExist(err) {
474return handler.ErrorStatus(http.StatusNotFound)
475}
476return handler.Errorf("cannot delete blob data for digest %q: %s", d, err)
477}
478return nil
479}
480
481// startTransferHandler initializes an upload for internal blob transfers.
482func (s *Server) startTransferHandler(w http.ResponseWriter, r *http.Request) error {
483d, err := httputil.ParseDigest(r, "digest")
484if err != nil {
485return err
486}
487if ok, err := blobExists(s.cas, d); err != nil {
488return handler.Errorf("check blob: %s", err)
489} else if ok {
490return handler.ErrorStatus(http.StatusConflict)
491}
492uid, err := s.uploader.start(d)
493if err != nil {
494return err
495}
496setUploadLocation(w, uid)
497w.WriteHeader(http.StatusOK)
498return nil
499}
500
501// patchTransferHandler uploads a chunk of a blob for internal uploads.
502func (s *Server) patchTransferHandler(w http.ResponseWriter, r *http.Request) error {
503d, err := httputil.ParseDigest(r, "digest")
504if err != nil {
505return err
506}
507uid, err := httputil.ParseParam(r, "uid")
508if err != nil {
509return err
510}
511start, end, err := parseContentRange(r.Header)
512if err != nil {
513return err
514}
515return s.uploader.patch(d, uid, r.Body, start, end)
516}
517
518// commitTransferHandler commits the upload of an internal blob transfer.
519// Internal blob transfers are not replicated to the rest of the cluster.
520func (s *Server) commitTransferHandler(w http.ResponseWriter, r *http.Request) error {
521d, err := httputil.ParseDigest(r, "digest")
522if err != nil {
523return err
524}
525uid, err := httputil.ParseParam(r, "uid")
526if err != nil {
527return err
528}
529if err := s.uploader.commit(d, uid); err != nil {
530return err
531}
532if err := s.metaInfoGenerator.Generate(d); err != nil {
533return handler.Errorf("generate metainfo: %s", err)
534}
535return nil
536}
537
538func (s *Server) handleUploadConflict(err error, namespace string, d core.Digest) error {
539if herr, ok := err.(*handler.Error); ok && herr.GetStatus() == http.StatusConflict {
540// Even if the blob was already uploaded and committed to cache, it's
541// still possible that adding the write-back task failed. Clients short
542// circuit on conflict and return success, so we must make sure that if we
543// tell a client to stop before commit, the blob has been written back.
544if err := s.writeBack(namespace, d, 0); err != nil {
545return err
546}
547}
548return err
549}
550
551// startClusterUploadHandler initializes an upload for external uploads.
552func (s *Server) startClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
553d, err := httputil.ParseDigest(r, "digest")
554if err != nil {
555return err
556}
557namespace, err := httputil.ParseParam(r, "namespace")
558if err != nil {
559return err
560}
561uid, err := s.uploader.start(d)
562if err != nil {
563return s.handleUploadConflict(err, namespace, d)
564}
565setUploadLocation(w, uid)
566w.WriteHeader(http.StatusOK)
567return nil
568}
569
570// patchClusterUploadHandler uploads a chunk of a blob for external uploads.
571func (s *Server) patchClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
572d, err := httputil.ParseDigest(r, "digest")
573if err != nil {
574return err
575}
576namespace, err := httputil.ParseParam(r, "namespace")
577if err != nil {
578return err
579}
580uid, err := httputil.ParseParam(r, "uid")
581if err != nil {
582return err
583}
584start, end, err := parseContentRange(r.Header)
585if err != nil {
586return err
587}
588if err := s.uploader.patch(d, uid, r.Body, start, end); err != nil {
589return s.handleUploadConflict(err, namespace, d)
590}
591return nil
592}
593
594// commitClusterUploadHandler commits an external blob upload asynchronously,
595// meaning the blob will be written back to remote storage in a non-blocking
596// fashion.
597func (s *Server) commitClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
598d, err := httputil.ParseDigest(r, "digest")
599if err != nil {
600return err
601}
602namespace, err := httputil.ParseParam(r, "namespace")
603if err != nil {
604return err
605}
606uid, err := httputil.ParseParam(r, "uid")
607if err != nil {
608return err
609}
610
611if err := s.uploader.commit(d, uid); err != nil {
612return s.handleUploadConflict(err, namespace, d)
613}
614if err := s.writeBack(namespace, d, 0); err != nil {
615return err
616}
617err = s.applyToReplicas(d, func(i int, client blobclient.Client) error {
618delay := s.config.DuplicateWriteBackStagger * time.Duration(i+1)
619f, err := s.cas.GetCacheFileReader(d.Hex())
620if err != nil {
621return fmt.Errorf("get cache file: %s", err)
622}
623if err := client.DuplicateUploadBlob(namespace, d, f, delay); err != nil {
624return fmt.Errorf("duplicate upload: %s", err)
625}
626return nil
627})
628if err != nil {
629s.stats.Counter("duplicate_write_back_errors").Inc(1)
630log.Errorf("Error duplicating write-back task to replicas: %s", err)
631}
632return nil
633}
634
635// duplicateCommitClusterUploadHandler commits a duplicate blob upload, which
636// will attempt to write-back after the requested delay.
637func (s *Server) duplicateCommitClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
638d, err := httputil.ParseDigest(r, "digest")
639if err != nil {
640return err
641}
642namespace, err := httputil.ParseParam(r, "namespace")
643if err != nil {
644return err
645}
646uid, err := httputil.ParseParam(r, "uid")
647if err != nil {
648return err
649}
650
651var dr blobclient.DuplicateCommitUploadRequest
652if err := json.NewDecoder(r.Body).Decode(&dr); err != nil {
653return handler.Errorf("decode body: %s", err)
654}
655delay := dr.Delay
656
657if err := s.uploader.commit(d, uid); err != nil {
658return err
659}
660return s.writeBack(namespace, d, delay)
661}
662
663func (s *Server) writeBack(namespace string, d core.Digest, delay time.Duration) error {
664if _, err := s.cas.SetCacheFileMetadata(d.Hex(), metadata.NewPersist(true)); err != nil {
665return handler.Errorf("set persist metadata: %s", err)
666}
667task := writeback.NewTask(namespace, d.Hex(), delay)
668if err := s.writeBackManager.Add(task); err != nil {
669return handler.Errorf("add write-back task: %s", err)
670}
671if err := s.metaInfoGenerator.Generate(d); err != nil {
672return handler.Errorf("generate metainfo: %s", err)
673}
674return nil
675}
676
677func (s *Server) forceCleanupHandler(w http.ResponseWriter, r *http.Request) error {
678// Note, this API is intended to be executed manually (i.e. curl), hence the
679// query arguments, usage of hours instead of nanoseconds, and JSON response
680// enumerating deleted files / errors.
681
682rawTTLHr := r.URL.Query().Get("ttl_hr")
683if rawTTLHr == "" {
684return handler.Errorf("query arg ttl_hr required").Status(http.StatusBadRequest)
685}
686ttlHr, err := strconv.Atoi(rawTTLHr)
687if err != nil {
688return handler.Errorf("invalid ttl_hr: %s", err).Status(http.StatusBadRequest)
689}
690ttl := time.Duration(ttlHr) * time.Hour
691
692names, err := s.cas.ListCacheFiles()
693if err != nil {
694return err
695}
696var errs, deleted []string
697for _, name := range names {
698if ok, err := s.maybeDelete(name, ttl); err != nil {
699errs = append(errs, fmt.Sprintf("%s: %s", name, err))
700} else if ok {
701deleted = append(deleted, name)
702}
703}
704return json.NewEncoder(w).Encode(map[string]interface{}{
705"deleted": deleted,
706"errors": errs,
707})
708}
709
710func (s *Server) maybeDelete(name string, ttl time.Duration) (deleted bool, err error) {
711d, err := core.NewSHA256DigestFromHex(name)
712if err != nil {
713return false, fmt.Errorf("parse digest: %s", err)
714}
715info, err := s.cas.GetCacheFileStat(name)
716if err != nil {
717return false, fmt.Errorf("store: %s", err)
718}
719expired := s.clk.Now().Sub(info.ModTime()) > ttl
720owns := stringset.FromSlice(s.hashRing.Locations(d)).Has(s.addr)
721if expired || !owns {
722// Ensure file is backed up properly before deleting.
723var pm metadata.Persist
724if err := s.cas.GetCacheFileMetadata(name, &pm); err != nil && !os.IsNotExist(err) {
725return false, fmt.Errorf("store: %s", err)
726}
727if pm.Value {
728// Note: It is possible that no writeback tasks exist, but the file
729// is persisted. We classify this as a leaked file which is safe to
730// delete.
731tasks, err := s.writeBackManager.Find(writeback.NewNameQuery(name))
732if err != nil {
733return false, fmt.Errorf("find writeback tasks: %s", err)
734}
735for _, task := range tasks {
736if err := s.writeBackManager.SyncExec(task); err != nil {
737return false, fmt.Errorf("writeback: %s", err)
738}
739}
740if err := s.cas.DeleteCacheFileMetadata(name, &metadata.Persist{}); err != nil {
741return false, fmt.Errorf("delete persist: %s", err)
742}
743}
744if err := s.cas.DeleteCacheFile(name); err != nil {
745return false, fmt.Errorf("delete: %s", err)
746}
747return true, nil
748}
749return false, nil
750}
751