kraken

Форк
0
750 строк · 22.5 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package blobserver
15

16
import (
17
	"encoding/json"
18
	"fmt"
19
	"io"
20
	"net/http"
21
	_ "net/http/pprof" // Registers /debug/pprof endpoints in http.DefaultServeMux.
22
	"os"
23
	"strconv"
24
	"strings"
25
	"sync"
26
	"time"
27

28
	"github.com/uber/kraken/core"
29
	"github.com/uber/kraken/lib/backend"
30
	"github.com/uber/kraken/lib/backend/backenderrors"
31
	"github.com/uber/kraken/lib/blobrefresh"
32
	"github.com/uber/kraken/lib/hashring"
33
	"github.com/uber/kraken/lib/metainfogen"
34
	"github.com/uber/kraken/lib/middleware"
35
	"github.com/uber/kraken/lib/persistedretry"
36
	"github.com/uber/kraken/lib/persistedretry/writeback"
37
	"github.com/uber/kraken/lib/store"
38
	"github.com/uber/kraken/lib/store/metadata"
39
	"github.com/uber/kraken/origin/blobclient"
40
	"github.com/uber/kraken/utils/errutil"
41
	"github.com/uber/kraken/utils/handler"
42
	"github.com/uber/kraken/utils/httputil"
43
	"github.com/uber/kraken/utils/listener"
44
	"github.com/uber/kraken/utils/log"
45
	"github.com/uber/kraken/utils/memsize"
46
	"github.com/uber/kraken/utils/stringset"
47

48
	"github.com/andres-erbsen/clock"
49
	"github.com/go-chi/chi"
50
	"github.com/uber-go/tally"
51
)
52

53
const _uploadChunkSize = 16 * memsize.MB
54

55
// Server defines a server that serves blob data for agent.
56
type Server struct {
57
	config            Config
58
	stats             tally.Scope
59
	clk               clock.Clock
60
	addr              string
61
	hashRing          hashring.Ring
62
	cas               *store.CAStore
63
	clientProvider    blobclient.Provider
64
	clusterProvider   blobclient.ClusterProvider
65
	backends          *backend.Manager
66
	blobRefresher     *blobrefresh.Refresher
67
	metaInfoGenerator *metainfogen.Generator
68
	uploader          *uploader
69
	writeBackManager  persistedretry.Manager
70

71
	// This is an unfortunate coupling between the p2p client and the blob server.
72
	// Tracker queries the origin cluster to discover which origins can seed
73
	// a given torrent, however this requires blob server to understand the
74
	// context of the p2p client running alongside it.
75
	pctx core.PeerContext
76
}
77

78
// New initializes a new Server.
79
func New(
80
	config Config,
81
	stats tally.Scope,
82
	clk clock.Clock,
83
	addr string,
84
	hashRing hashring.Ring,
85
	cas *store.CAStore,
86
	clientProvider blobclient.Provider,
87
	clusterProvider blobclient.ClusterProvider,
88
	pctx core.PeerContext,
89
	backends *backend.Manager,
90
	blobRefresher *blobrefresh.Refresher,
91
	metaInfoGenerator *metainfogen.Generator,
92
	writeBackManager persistedretry.Manager) (*Server, error) {
93

94
	config = config.applyDefaults()
95

96
	stats = stats.Tagged(map[string]string{
97
		"module": "blobserver",
98
	})
99

100
	return &Server{
101
		config:            config,
102
		stats:             stats,
103
		clk:               clk,
104
		addr:              addr,
105
		hashRing:          hashRing,
106
		cas:               cas,
107
		clientProvider:    clientProvider,
108
		clusterProvider:   clusterProvider,
109
		backends:          backends,
110
		blobRefresher:     blobRefresher,
111
		metaInfoGenerator: metaInfoGenerator,
112
		uploader:          newUploader(cas),
113
		writeBackManager:  writeBackManager,
114
		pctx:              pctx,
115
	}, nil
116
}
117

118
// Addr returns the address the blob server is configured on.
119
func (s *Server) Addr() string {
120
	return s.addr
121
}
122

123
// Handler returns an http handler for the blob server.
124
func (s *Server) Handler() http.Handler {
125
	r := chi.NewRouter()
126

127
	r.Use(middleware.StatusCounter(s.stats))
128
	r.Use(middleware.LatencyTimer(s.stats))
129

130
	// Public endpoints:
131

132
	r.Get("/health", handler.Wrap(s.healthCheckHandler))
133

134
	r.Get("/blobs/{digest}/locations", handler.Wrap(s.getLocationsHandler))
135

136
	r.Post("/namespace/{namespace}/blobs/{digest}/uploads", handler.Wrap(s.startClusterUploadHandler))
137
	r.Patch("/namespace/{namespace}/blobs/{digest}/uploads/{uid}", handler.Wrap(s.patchClusterUploadHandler))
138
	r.Put("/namespace/{namespace}/blobs/{digest}/uploads/{uid}", handler.Wrap(s.commitClusterUploadHandler))
139

140
	r.Get("/namespace/{namespace}/blobs/{digest}", handler.Wrap(s.downloadBlobHandler))
141

142
	r.Post("/namespace/{namespace}/blobs/{digest}/remote/{remote}", handler.Wrap(s.replicateToRemoteHandler))
143

144
	r.Post("/forcecleanup", handler.Wrap(s.forceCleanupHandler))
145

146
	// Internal endpoints:
147

148
	r.Post("/internal/blobs/{digest}/uploads", handler.Wrap(s.startTransferHandler))
149
	r.Patch("/internal/blobs/{digest}/uploads/{uid}", handler.Wrap(s.patchTransferHandler))
150
	r.Put("/internal/blobs/{digest}/uploads/{uid}", handler.Wrap(s.commitTransferHandler))
151

152
	r.Delete("/internal/blobs/{digest}", handler.Wrap(s.deleteBlobHandler))
153

154
	r.Post("/internal/blobs/{digest}/metainfo", handler.Wrap(s.overwriteMetaInfoHandler))
155

156
	r.Get("/internal/peercontext", handler.Wrap(s.getPeerContextHandler))
157

158
	r.Head("/internal/namespace/{namespace}/blobs/{digest}", handler.Wrap(s.statHandler))
159

160
	r.Get("/internal/namespace/{namespace}/blobs/{digest}/metainfo", handler.Wrap(s.getMetaInfoHandler))
161

162
	r.Put(
163
		"/internal/duplicate/namespace/{namespace}/blobs/{digest}/uploads/{uid}",
164
		handler.Wrap(s.duplicateCommitClusterUploadHandler))
165

166
	r.Mount("/", http.DefaultServeMux) // Serves /debug/pprof endpoints.
167

168
	return r
169
}
170

171
// ListenAndServe is a blocking call which runs s.
172
func (s *Server) ListenAndServe(h http.Handler) error {
173
	log.Infof("Starting blob server on %s", s.config.Listener)
174
	return listener.Serve(s.config.Listener, h)
175
}
176

177
func (s *Server) healthCheckHandler(w http.ResponseWriter, r *http.Request) error {
178
	fmt.Fprintln(w, "OK")
179
	return nil
180
}
181

182
// statHandler returns blob info if it exists.
183
func (s *Server) statHandler(w http.ResponseWriter, r *http.Request) error {
184
	checkLocal, err := strconv.ParseBool(httputil.GetQueryArg(r, "local", "false"))
185
	if err != nil {
186
		return handler.Errorf("parse arg `local` as bool: %s", err)
187
	}
188
	namespace, err := httputil.ParseParam(r, "namespace")
189
	if err != nil {
190
		return err
191
	}
192
	d, err := httputil.ParseDigest(r, "digest")
193
	if err != nil {
194
		return err
195
	}
196

197
	bi, err := s.stat(namespace, d, checkLocal)
198
	if os.IsNotExist(err) {
199
		return handler.ErrorStatus(http.StatusNotFound)
200
	} else if err != nil {
201
		return fmt.Errorf("stat: %s", err)
202
	}
203
	w.Header().Set("Content-Length", strconv.FormatInt(bi.Size, 10))
204
	log.Debugf("successfully check blob %s exists", d.Hex())
205
	return nil
206
}
207

208
func (s *Server) stat(namespace string, d core.Digest, checkLocal bool) (*core.BlobInfo, error) {
209
	fi, err := s.cas.GetCacheFileStat(d.Hex())
210
	if err == nil {
211
		return core.NewBlobInfo(fi.Size()), nil
212
	} else if os.IsNotExist(err) {
213
		if !checkLocal {
214
			client, err := s.backends.GetClient(namespace)
215
			if err != nil {
216
				return nil, fmt.Errorf("get backend client: %s", err)
217
			}
218
			if bi, err := client.Stat(namespace, d.Hex()); err == nil {
219
				return bi, nil
220
			} else if err == backenderrors.ErrBlobNotFound {
221
				return nil, os.ErrNotExist
222
			} else {
223
				return nil, fmt.Errorf("backend stat: %s", err)
224
			}
225
		}
226
		return nil, err // os.ErrNotExist
227
	}
228

229
	return nil, fmt.Errorf("stat cache file: %s", err)
230
}
231

232
func (s *Server) downloadBlobHandler(w http.ResponseWriter, r *http.Request) error {
233
	namespace, err := httputil.ParseParam(r, "namespace")
234
	if err != nil {
235
		return err
236
	}
237
	d, err := httputil.ParseDigest(r, "digest")
238
	if err != nil {
239
		return err
240
	}
241
	if err := s.downloadBlob(namespace, d, w); err != nil {
242
		return err
243
	}
244
	setOctetStreamContentType(w)
245
	return nil
246
}
247

248
func (s *Server) replicateToRemoteHandler(w http.ResponseWriter, r *http.Request) error {
249
	namespace, err := httputil.ParseParam(r, "namespace")
250
	if err != nil {
251
		return err
252
	}
253
	d, err := httputil.ParseDigest(r, "digest")
254
	if err != nil {
255
		return err
256
	}
257
	remote, err := httputil.ParseParam(r, "remote")
258
	if err != nil {
259
		return err
260
	}
261
	return s.replicateToRemote(namespace, d, remote)
262
}
263

264
func (s *Server) replicateToRemote(namespace string, d core.Digest, remoteDNS string) error {
265
	f, err := s.cas.GetCacheFileReader(d.Hex())
266
	if err != nil {
267
		if os.IsNotExist(err) {
268
			return s.startRemoteBlobDownload(namespace, d, false)
269
		}
270
		return handler.Errorf("file store: %s", err)
271
	}
272
	defer f.Close()
273

274
	remote, err := s.clusterProvider.Provide(remoteDNS)
275
	if err != nil {
276
		return handler.Errorf("remote cluster provider: %s", err)
277
	}
278
	return remote.UploadBlob(namespace, d, f)
279
}
280

281
// deleteBlobHandler deletes blob data.
282
func (s *Server) deleteBlobHandler(w http.ResponseWriter, r *http.Request) error {
283
	d, err := httputil.ParseDigest(r, "digest")
284
	if err != nil {
285
		return err
286
	}
287
	if err := s.deleteBlob(d); err != nil {
288
		return err
289
	}
290
	setContentLength(w, 0)
291
	w.WriteHeader(http.StatusAccepted)
292
	log.Debugf("successfully delete blob %s", d.Hex())
293
	return nil
294
}
295

296
func (s *Server) getLocationsHandler(w http.ResponseWriter, r *http.Request) error {
297
	d, err := httputil.ParseDigest(r, "digest")
298
	if err != nil {
299
		return err
300
	}
301
	locs := s.hashRing.Locations(d)
302
	w.Header().Set("Origin-Locations", strings.Join(locs, ","))
303
	w.WriteHeader(http.StatusOK)
304
	return nil
305
}
306

307
// getPeerContextHandler returns the Server's peer context as JSON.
308
func (s *Server) getPeerContextHandler(w http.ResponseWriter, r *http.Request) error {
309
	if err := json.NewEncoder(w).Encode(s.pctx); err != nil {
310
		return handler.Errorf("error converting peer context to json: %s", err)
311
	}
312
	return nil
313
}
314

315
func (s *Server) getMetaInfoHandler(w http.ResponseWriter, r *http.Request) error {
316
	namespace, err := httputil.ParseParam(r, "namespace")
317
	if err != nil {
318
		return err
319
	}
320
	d, err := httputil.ParseDigest(r, "digest")
321
	if err != nil {
322
		return err
323
	}
324
	raw, err := s.getMetaInfo(namespace, d)
325
	if err != nil {
326
		return err
327
	}
328
	w.Write(raw)
329
	return nil
330
}
331

332
func (s *Server) overwriteMetaInfoHandler(w http.ResponseWriter, r *http.Request) error {
333
	d, err := httputil.ParseDigest(r, "digest")
334
	if err != nil {
335
		return err
336
	}
337
	pieceLength, err := strconv.ParseInt(r.URL.Query().Get("piece_length"), 10, 64)
338
	if err != nil {
339
		return handler.Errorf("invalid piece_length argument: %s", err).Status(http.StatusBadRequest)
340
	}
341
	return s.overwriteMetaInfo(d, pieceLength)
342
}
343

344
// overwriteMetaInfo generates metainfo configured with pieceLength for d and
345
// writes it to disk, overwriting any existing metainfo. Primarily intended for
346
// benchmarking purposes.
347
func (s *Server) overwriteMetaInfo(d core.Digest, pieceLength int64) error {
348
	f, err := s.cas.GetCacheFileReader(d.Hex())
349
	if err != nil {
350
		return handler.Errorf("get cache file: %s", err)
351
	}
352
	mi, err := core.NewMetaInfo(d, f, pieceLength)
353
	if err != nil {
354
		return handler.Errorf("create metainfo: %s", err)
355
	}
356
	if _, err := s.cas.SetCacheFileMetadata(d.Hex(), metadata.NewTorrentMeta(mi)); err != nil {
357
		return handler.Errorf("set metainfo: %s", err)
358
	}
359
	return nil
360
}
361

362
// getMetaInfo returns metainfo for d. If no blob exists under d, a download of
363
// the blob from the storage backend configured for namespace will be initiated.
364
// This download is asynchronous and getMetaInfo will immediately return a
365
// "202 Accepted" server error.
366
func (s *Server) getMetaInfo(namespace string, d core.Digest) ([]byte, error) {
367
	var tm metadata.TorrentMeta
368
	if err := s.cas.GetCacheFileMetadata(d.Hex(), &tm); os.IsNotExist(err) {
369
		return nil, s.startRemoteBlobDownload(namespace, d, true)
370
	} else if err != nil {
371
		return nil, handler.Errorf("get cache metadata: %s", err)
372
	}
373
	return tm.Serialize()
374
}
375

376
type localReplicationHook struct {
377
	server *Server
378
}
379

380
func (h *localReplicationHook) Run(d core.Digest) {
381
	timer := h.server.stats.Timer("replicate_blob").Start()
382
	if err := h.server.replicateBlobLocally(d); err != nil {
383
		// Don't return error here as we only want to cache storage backend errors.
384
		log.With("blob", d.Hex()).Errorf("Error replicating remote blob: %s", err)
385
		h.server.stats.Counter("replicate_blob_errors").Inc(1)
386
		return
387
	}
388
	timer.Stop()
389
}
390

391
func (s *Server) startRemoteBlobDownload(
392
	namespace string, d core.Digest, replicateLocally bool) error {
393

394
	var hooks []blobrefresh.PostHook
395
	if replicateLocally {
396
		hooks = append(hooks, &localReplicationHook{s})
397
	}
398
	err := s.blobRefresher.Refresh(namespace, d, hooks...)
399
	switch err {
400
	case blobrefresh.ErrPending, nil:
401
		return handler.ErrorStatus(http.StatusAccepted)
402
	case blobrefresh.ErrNotFound:
403
		return handler.ErrorStatus(http.StatusNotFound)
404
	case blobrefresh.ErrWorkersBusy:
405
		return handler.ErrorStatus(http.StatusServiceUnavailable)
406
	default:
407
		return err
408
	}
409
}
410

411
func (s *Server) replicateBlobLocally(d core.Digest) error {
412
	return s.applyToReplicas(d, func(i int, client blobclient.Client) error {
413
		f, err := s.cas.GetCacheFileReader(d.Hex())
414
		if err != nil {
415
			return fmt.Errorf("get cache reader: %s", err)
416
		}
417
		if err := client.TransferBlob(d, f); err != nil {
418
			return fmt.Errorf("transfer blob: %s", err)
419
		}
420
		return nil
421
	})
422
}
423

424
// applyToReplicas applies f to the replicas of d concurrently in random order,
425
// not including the current origin. Passes the index of the iteration to f.
426
func (s *Server) applyToReplicas(d core.Digest, f func(i int, c blobclient.Client) error) error {
427
	replicas := stringset.FromSlice(s.hashRing.Locations(d))
428
	replicas.Remove(s.addr)
429

430
	var mu sync.Mutex
431
	var errs []error
432

433
	var wg sync.WaitGroup
434
	var i int
435
	for replica := range replicas {
436
		wg.Add(1)
437
		go func(i int, replica string) {
438
			defer wg.Done()
439
			if err := f(i, s.clientProvider.Provide(replica)); err != nil {
440
				mu.Lock()
441
				errs = append(errs, err)
442
				mu.Unlock()
443
			}
444
		}(i, replica)
445
		i++
446
	}
447
	wg.Wait()
448

449
	return errutil.Join(errs)
450
}
451

452
// downloadBlob downloads blob for d into dst. If no blob exists under d, a
453
// download of the blob from the storage backend configured for namespace will
454
// be initiated. This download is asynchronous and downloadBlob will immediately
455
// return a "202 Accepted" handler error.
456
func (s *Server) downloadBlob(namespace string, d core.Digest, dst io.Writer) error {
457
	f, err := s.cas.GetCacheFileReader(d.Hex())
458
	if os.IsNotExist(err) {
459
		return s.startRemoteBlobDownload(namespace, d, true)
460
	} else if err != nil {
461
		return handler.Errorf("get cache file: %s", err)
462
	}
463
	defer f.Close()
464

465
	if _, err := io.Copy(dst, f); err != nil {
466
		return handler.Errorf("copy blob: %s", err)
467
	}
468
	return nil
469
}
470

471
func (s *Server) deleteBlob(d core.Digest) error {
472
	if err := s.cas.DeleteCacheFile(d.Hex()); err != nil {
473
		if os.IsNotExist(err) {
474
			return handler.ErrorStatus(http.StatusNotFound)
475
		}
476
		return handler.Errorf("cannot delete blob data for digest %q: %s", d, err)
477
	}
478
	return nil
479
}
480

481
// startTransferHandler initializes an upload for internal blob transfers.
482
func (s *Server) startTransferHandler(w http.ResponseWriter, r *http.Request) error {
483
	d, err := httputil.ParseDigest(r, "digest")
484
	if err != nil {
485
		return err
486
	}
487
	if ok, err := blobExists(s.cas, d); err != nil {
488
		return handler.Errorf("check blob: %s", err)
489
	} else if ok {
490
		return handler.ErrorStatus(http.StatusConflict)
491
	}
492
	uid, err := s.uploader.start(d)
493
	if err != nil {
494
		return err
495
	}
496
	setUploadLocation(w, uid)
497
	w.WriteHeader(http.StatusOK)
498
	return nil
499
}
500

501
// patchTransferHandler uploads a chunk of a blob for internal uploads.
502
func (s *Server) patchTransferHandler(w http.ResponseWriter, r *http.Request) error {
503
	d, err := httputil.ParseDigest(r, "digest")
504
	if err != nil {
505
		return err
506
	}
507
	uid, err := httputil.ParseParam(r, "uid")
508
	if err != nil {
509
		return err
510
	}
511
	start, end, err := parseContentRange(r.Header)
512
	if err != nil {
513
		return err
514
	}
515
	return s.uploader.patch(d, uid, r.Body, start, end)
516
}
517

518
// commitTransferHandler commits the upload of an internal blob transfer.
519
// Internal blob transfers are not replicated to the rest of the cluster.
520
func (s *Server) commitTransferHandler(w http.ResponseWriter, r *http.Request) error {
521
	d, err := httputil.ParseDigest(r, "digest")
522
	if err != nil {
523
		return err
524
	}
525
	uid, err := httputil.ParseParam(r, "uid")
526
	if err != nil {
527
		return err
528
	}
529
	if err := s.uploader.commit(d, uid); err != nil {
530
		return err
531
	}
532
	if err := s.metaInfoGenerator.Generate(d); err != nil {
533
		return handler.Errorf("generate metainfo: %s", err)
534
	}
535
	return nil
536
}
537

538
func (s *Server) handleUploadConflict(err error, namespace string, d core.Digest) error {
539
	if herr, ok := err.(*handler.Error); ok && herr.GetStatus() == http.StatusConflict {
540
		// Even if the blob was already uploaded and committed to cache, it's
541
		// still possible that adding the write-back task failed. Clients short
542
		// circuit on conflict and return success, so we must make sure that if we
543
		// tell a client to stop before commit, the blob has been written back.
544
		if err := s.writeBack(namespace, d, 0); err != nil {
545
			return err
546
		}
547
	}
548
	return err
549
}
550

551
// startClusterUploadHandler initializes an upload for external uploads.
552
func (s *Server) startClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
553
	d, err := httputil.ParseDigest(r, "digest")
554
	if err != nil {
555
		return err
556
	}
557
	namespace, err := httputil.ParseParam(r, "namespace")
558
	if err != nil {
559
		return err
560
	}
561
	uid, err := s.uploader.start(d)
562
	if err != nil {
563
		return s.handleUploadConflict(err, namespace, d)
564
	}
565
	setUploadLocation(w, uid)
566
	w.WriteHeader(http.StatusOK)
567
	return nil
568
}
569

570
// patchClusterUploadHandler uploads a chunk of a blob for external uploads.
571
func (s *Server) patchClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
572
	d, err := httputil.ParseDigest(r, "digest")
573
	if err != nil {
574
		return err
575
	}
576
	namespace, err := httputil.ParseParam(r, "namespace")
577
	if err != nil {
578
		return err
579
	}
580
	uid, err := httputil.ParseParam(r, "uid")
581
	if err != nil {
582
		return err
583
	}
584
	start, end, err := parseContentRange(r.Header)
585
	if err != nil {
586
		return err
587
	}
588
	if err := s.uploader.patch(d, uid, r.Body, start, end); err != nil {
589
		return s.handleUploadConflict(err, namespace, d)
590
	}
591
	return nil
592
}
593

594
// commitClusterUploadHandler commits an external blob upload asynchronously,
595
// meaning the blob will be written back to remote storage in a non-blocking
596
// fashion.
597
func (s *Server) commitClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
598
	d, err := httputil.ParseDigest(r, "digest")
599
	if err != nil {
600
		return err
601
	}
602
	namespace, err := httputil.ParseParam(r, "namespace")
603
	if err != nil {
604
		return err
605
	}
606
	uid, err := httputil.ParseParam(r, "uid")
607
	if err != nil {
608
		return err
609
	}
610

611
	if err := s.uploader.commit(d, uid); err != nil {
612
		return s.handleUploadConflict(err, namespace, d)
613
	}
614
	if err := s.writeBack(namespace, d, 0); err != nil {
615
		return err
616
	}
617
	err = s.applyToReplicas(d, func(i int, client blobclient.Client) error {
618
		delay := s.config.DuplicateWriteBackStagger * time.Duration(i+1)
619
		f, err := s.cas.GetCacheFileReader(d.Hex())
620
		if err != nil {
621
			return fmt.Errorf("get cache file: %s", err)
622
		}
623
		if err := client.DuplicateUploadBlob(namespace, d, f, delay); err != nil {
624
			return fmt.Errorf("duplicate upload: %s", err)
625
		}
626
		return nil
627
	})
628
	if err != nil {
629
		s.stats.Counter("duplicate_write_back_errors").Inc(1)
630
		log.Errorf("Error duplicating write-back task to replicas: %s", err)
631
	}
632
	return nil
633
}
634

635
// duplicateCommitClusterUploadHandler commits a duplicate blob upload, which
636
// will attempt to write-back after the requested delay.
637
func (s *Server) duplicateCommitClusterUploadHandler(w http.ResponseWriter, r *http.Request) error {
638
	d, err := httputil.ParseDigest(r, "digest")
639
	if err != nil {
640
		return err
641
	}
642
	namespace, err := httputil.ParseParam(r, "namespace")
643
	if err != nil {
644
		return err
645
	}
646
	uid, err := httputil.ParseParam(r, "uid")
647
	if err != nil {
648
		return err
649
	}
650

651
	var dr blobclient.DuplicateCommitUploadRequest
652
	if err := json.NewDecoder(r.Body).Decode(&dr); err != nil {
653
		return handler.Errorf("decode body: %s", err)
654
	}
655
	delay := dr.Delay
656

657
	if err := s.uploader.commit(d, uid); err != nil {
658
		return err
659
	}
660
	return s.writeBack(namespace, d, delay)
661
}
662

663
func (s *Server) writeBack(namespace string, d core.Digest, delay time.Duration) error {
664
	if _, err := s.cas.SetCacheFileMetadata(d.Hex(), metadata.NewPersist(true)); err != nil {
665
		return handler.Errorf("set persist metadata: %s", err)
666
	}
667
	task := writeback.NewTask(namespace, d.Hex(), delay)
668
	if err := s.writeBackManager.Add(task); err != nil {
669
		return handler.Errorf("add write-back task: %s", err)
670
	}
671
	if err := s.metaInfoGenerator.Generate(d); err != nil {
672
		return handler.Errorf("generate metainfo: %s", err)
673
	}
674
	return nil
675
}
676

677
func (s *Server) forceCleanupHandler(w http.ResponseWriter, r *http.Request) error {
678
	// Note, this API is intended to be executed manually (i.e. curl), hence the
679
	// query arguments, usage of hours instead of nanoseconds, and JSON response
680
	// enumerating deleted files / errors.
681

682
	rawTTLHr := r.URL.Query().Get("ttl_hr")
683
	if rawTTLHr == "" {
684
		return handler.Errorf("query arg ttl_hr required").Status(http.StatusBadRequest)
685
	}
686
	ttlHr, err := strconv.Atoi(rawTTLHr)
687
	if err != nil {
688
		return handler.Errorf("invalid ttl_hr: %s", err).Status(http.StatusBadRequest)
689
	}
690
	ttl := time.Duration(ttlHr) * time.Hour
691

692
	names, err := s.cas.ListCacheFiles()
693
	if err != nil {
694
		return err
695
	}
696
	var errs, deleted []string
697
	for _, name := range names {
698
		if ok, err := s.maybeDelete(name, ttl); err != nil {
699
			errs = append(errs, fmt.Sprintf("%s: %s", name, err))
700
		} else if ok {
701
			deleted = append(deleted, name)
702
		}
703
	}
704
	return json.NewEncoder(w).Encode(map[string]interface{}{
705
		"deleted": deleted,
706
		"errors":  errs,
707
	})
708
}
709

710
func (s *Server) maybeDelete(name string, ttl time.Duration) (deleted bool, err error) {
711
	d, err := core.NewSHA256DigestFromHex(name)
712
	if err != nil {
713
		return false, fmt.Errorf("parse digest: %s", err)
714
	}
715
	info, err := s.cas.GetCacheFileStat(name)
716
	if err != nil {
717
		return false, fmt.Errorf("store: %s", err)
718
	}
719
	expired := s.clk.Now().Sub(info.ModTime()) > ttl
720
	owns := stringset.FromSlice(s.hashRing.Locations(d)).Has(s.addr)
721
	if expired || !owns {
722
		// Ensure file is backed up properly before deleting.
723
		var pm metadata.Persist
724
		if err := s.cas.GetCacheFileMetadata(name, &pm); err != nil && !os.IsNotExist(err) {
725
			return false, fmt.Errorf("store: %s", err)
726
		}
727
		if pm.Value {
728
			// Note: It is possible that no writeback tasks exist, but the file
729
			// is persisted. We classify this as a leaked file which is safe to
730
			// delete.
731
			tasks, err := s.writeBackManager.Find(writeback.NewNameQuery(name))
732
			if err != nil {
733
				return false, fmt.Errorf("find writeback tasks: %s", err)
734
			}
735
			for _, task := range tasks {
736
				if err := s.writeBackManager.SyncExec(task); err != nil {
737
					return false, fmt.Errorf("writeback: %s", err)
738
				}
739
			}
740
			if err := s.cas.DeleteCacheFileMetadata(name, &metadata.Persist{}); err != nil {
741
				return false, fmt.Errorf("delete persist: %s", err)
742
			}
743
		}
744
		if err := s.cas.DeleteCacheFile(name); err != nil {
745
			return false, fmt.Errorf("delete: %s", err)
746
		}
747
		return true, nil
748
	}
749
	return false, nil
750
}
751

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.