kraken
803 строки · 21.7 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package blobserver15
16import (17"bytes"18"errors"19"fmt"20"io/ioutil"21"net/http"22"testing"23"time"24
25"github.com/golang/mock/gomock"26"github.com/stretchr/testify/require"27
28"github.com/uber/kraken/core"29"github.com/uber/kraken/lib/backend/backenderrors"30"github.com/uber/kraken/lib/persistedretry"31"github.com/uber/kraken/lib/persistedretry/writeback"32"github.com/uber/kraken/lib/store/metadata"33"github.com/uber/kraken/origin/blobclient"34"github.com/uber/kraken/utils/httputil"35"github.com/uber/kraken/utils/mockutil"36"github.com/uber/kraken/utils/testutil"37)
38
39func TestHealth(t *testing.T) {40require := require.New(t)41
42cp := newTestClientProvider()43
44s := newTestServer(t, master1, hashRingMaxReplica(), cp)45defer s.cleanup()46
47resp, err := httputil.Get(48fmt.Sprintf("http://%s/health", s.addr))49defer resp.Body.Close()50require.NoError(err)51b, err := ioutil.ReadAll(resp.Body)52require.NoError(err)53require.Equal("OK\n", string(b))54}
55
56func TestStatHandlerLocalNotFound(t *testing.T) {57require := require.New(t)58
59cp := newTestClientProvider()60
61s := newTestServer(t, master1, hashRingMaxReplica(), cp)62defer s.cleanup()63
64d := core.DigestFixture()65namespace := core.TagFixture()66
67_, err := cp.Provide(s.host).StatLocal(namespace, d)68require.Equal(blobclient.ErrBlobNotFound, err)69}
70
71func TestStatHandlerInvalidParam(t *testing.T) {72digest := core.DigestFixture()73
74tests := []struct {75desc string76path string77status int78}{79{80"empty namespace",81fmt.Sprintf("internal/namespace//blobs/%s", digest),82http.StatusBadRequest,83}, {84"invalid digest",85"internal/namespace/foo/blobs/bar",86http.StatusBadRequest,87}, {88"invalid local param",89fmt.Sprintf("internal/namespace/foo/blobs/%s?local=bar", digest),90http.StatusInternalServerError,91},92}93for _, test := range tests {94t.Run(test.desc, func(t *testing.T) {95require := require.New(t)96
97cp := newTestClientProvider()98
99s := newTestServer(t, master1, hashRingMaxReplica(), cp)100defer s.cleanup()101
102_, err := httputil.Head(fmt.Sprintf("http://%s/%s", s.addr, test.path))103require.Error(err)104require.True(httputil.IsStatus(err, test.status))105})106}107}
108
109func TestStatHandlerNotFound(t *testing.T) {110require := require.New(t)111
112cp := newTestClientProvider()113
114s := newTestServer(t, master1, hashRingMaxReplica(), cp)115defer s.cleanup()116
117d := core.DigestFixture()118namespace := core.TagFixture()119
120backendClient := s.backendClient(namespace)121
122backendClient.EXPECT().Stat(namespace, d.Hex()).Return(nil, backenderrors.ErrBlobNotFound)123
124_, err := cp.Provide(master1).Stat(namespace, d)125require.Equal(blobclient.ErrBlobNotFound, err)126}
127
128func TestStatHandlerReturnSize(t *testing.T) {129require := require.New(t)130
131cp := newTestClientProvider()132
133s := newTestServer(t, master1, hashRingMaxReplica(), cp)134defer s.cleanup()135
136client := cp.Provide(s.host)137blob := core.SizedBlobFixture(256, 8)138namespace := core.TagFixture()139
140require.NoError(client.TransferBlob(blob.Digest, bytes.NewReader(blob.Content)))141
142ensureHasBlob(t, cp.Provide(s.host), namespace, blob)143
144bi, err := cp.Provide(master1).Stat(namespace, blob.Digest)145require.NoError(err)146require.NotNil(bi)147require.Equal(int64(256), bi.Size)148}
149
150func TestDownloadBlobInvalidParam(t *testing.T) {151digest := core.DigestFixture()152
153tests := []struct {154desc string155path string156status int157}{158{159"empty namespace",160fmt.Sprintf("namespace//blobs/%s", digest),161http.StatusBadRequest,162}, {163"invalid digest",164"namespace/foo/blobs/bar",165http.StatusBadRequest,166},167}168for _, test := range tests {169t.Run(test.desc, func(t *testing.T) {170require := require.New(t)171
172cp := newTestClientProvider()173
174s := newTestServer(t, master1, hashRingMaxReplica(), cp)175defer s.cleanup()176
177_, err := httputil.Get(fmt.Sprintf("http://%s/%s", s.addr, test.path))178require.Error(err)179require.True(httputil.IsStatus(err, test.status))180})181}182}
183
184func TestDownloadBlobNotFound(t *testing.T) {185require := require.New(t)186
187cp := newTestClientProvider()188
189s := newTestServer(t, master1, hashRingMaxReplica(), cp)190defer s.cleanup()191
192d := core.DigestFixture()193namespace := core.TagFixture()194
195backendClient := s.backendClient(namespace)196backendClient.EXPECT().Stat(namespace, d.Hex()).Return(nil, backenderrors.ErrBlobNotFound)197
198err := cp.Provide(master1).DownloadBlob(namespace, d, ioutil.Discard)199require.Error(err)200require.Equal(http.StatusNotFound, err.(httputil.StatusError).Status)201}
202
203func TestDeleteBlob(t *testing.T) {204require := require.New(t)205
206cp := newTestClientProvider()207
208s := newTestServer(t, master1, hashRingMaxReplica(), cp)209defer s.cleanup()210
211client := cp.Provide(s.host)212
213blob := core.NewBlobFixture()214namespace := core.TagFixture()215
216require.NoError(client.TransferBlob(blob.Digest, bytes.NewReader(blob.Content)))217
218ensureHasBlob(t, cp.Provide(s.host), namespace, blob)219
220require.NoError(client.DeleteBlob(blob.Digest))221
222_, err := client.StatLocal(namespace, blob.Digest)223require.Equal(blobclient.ErrBlobNotFound, err)224}
225
226func TestDeleteBlobInvalidParam(t *testing.T) {227require := require.New(t)228
229cp := newTestClientProvider()230
231s := newTestServer(t, master1, hashRingMaxReplica(), cp)232defer s.cleanup()233
234_, err := httputil.Delete(fmt.Sprintf("http://%s/internal/blobs/foo", s.addr))235require.Error(err)236require.True(httputil.IsStatus(err, http.StatusBadRequest))237}
238
239func TestGetLocationsOK(t *testing.T) {240require := require.New(t)241
242cp := newTestClientProvider()243ring := hashRingSomeReplica()244
245s := newTestServer(t, master1, ring, cp)246defer s.cleanup()247
248blob := computeBlobForHosts(ring, master1, master2)249
250locs, err := cp.Provide(s.host).Locations(blob.Digest)251require.NoError(err)252require.ElementsMatch([]string{master1, master2}, locs)253}
254
255func TestGetPeerContextOK(t *testing.T) {256require := require.New(t)257
258cp := newTestClientProvider()259
260s := newTestServer(t, master1, hashRingSomeReplica(), cp)261defer s.cleanup()262
263pctx, err := cp.Provide(master1).GetPeerContext()264require.NoError(err)265require.Equal(s.pctx, pctx)266}
267
268func TestGetMetaInfoDownloadsBlobAndReplicates(t *testing.T) {269require := require.New(t)270
271ring := hashRingSomeReplica()272cp := newTestClientProvider()273namespace := core.TagFixture()274
275s1 := newTestServer(t, master1, ring, cp)276defer s1.cleanup()277
278s2 := newTestServer(t, master2, ring, cp)279defer s2.cleanup()280
281blob := computeBlobForHosts(ring, s1.host, s2.host)282
283backendClient := s1.backendClient(namespace)284backendClient.EXPECT().Stat(namespace,285blob.Digest.Hex()).Return(core.NewBlobInfo(int64(len(blob.Content))), nil).AnyTimes()286backendClient.EXPECT().Download(namespace, blob.Digest.Hex(), mockutil.MatchWriter(blob.Content)).Return(nil)287
288mi, err := cp.Provide(master1).GetMetaInfo(namespace, blob.Digest)289require.True(httputil.IsAccepted(err))290require.Nil(mi)291
292require.NoError(testutil.PollUntilTrue(5*time.Second, func() bool {293_, err := cp.Provide(master1).GetMetaInfo(namespace, blob.Digest)294return !httputil.IsAccepted(err)295}))296
297mi, err = cp.Provide(master1).GetMetaInfo(namespace, blob.Digest)298require.NoError(err)299require.NotNil(mi)300require.Equal(len(blob.Content), int(mi.Length()))301
302// Ensure blob was replicated to other master.303require.NoError(testutil.PollUntilTrue(5*time.Second, func() bool {304_, err := cp.Provide(master2).StatLocal(namespace, blob.Digest)305return err == nil306}))307}
308
309func TestGetMetaInfoBlobNotFound(t *testing.T) {310require := require.New(t)311
312cp := newTestClientProvider()313
314s := newTestServer(t, master1, hashRingMaxReplica(), cp)315defer s.cleanup()316
317d := core.DigestFixture()318namespace := core.TagFixture()319
320backendClient := s.backendClient(namespace)321backendClient.EXPECT().Stat(namespace, d.Hex()).Return(nil, backenderrors.ErrBlobNotFound)322
323mi, err := cp.Provide(master1).GetMetaInfo(namespace, d)324require.True(httputil.IsNotFound(err))325require.Nil(mi)326}
327
328func TestGetMetaInfoInvalidParam(t *testing.T) {329digest := core.DigestFixture()330
331tests := []struct {332desc string333path string334status int335}{336{337"empty namespace",338fmt.Sprintf("internal/namespace//blobs/%s/metainfo", digest),339http.StatusBadRequest,340}, {341"invalid digest",342"internal/namespace/foo/blobs/bar/metainfo",343http.StatusBadRequest,344},345}346for _, test := range tests {347t.Run(test.desc, func(t *testing.T) {348require := require.New(t)349
350cp := newTestClientProvider()351
352s := newTestServer(t, master1, hashRingMaxReplica(), cp)353defer s.cleanup()354
355_, err := httputil.Get(fmt.Sprintf("http://%s/%s", s.addr, test.path))356require.Error(err)357require.True(httputil.IsStatus(err, test.status))358})359}360}
361
362func TestTransferBlob(t *testing.T) {363require := require.New(t)364
365cp := newTestClientProvider()366
367s := newTestServer(t, master1, hashRingMaxReplica(), cp)368defer s.cleanup()369
370blob := core.NewBlobFixture()371namespace := core.TagFixture()372
373err := cp.Provide(master1).TransferBlob(blob.Digest, bytes.NewReader(blob.Content))374require.NoError(err)375ensureHasBlob(t, cp.Provide(master1), namespace, blob)376
377// Ensure metainfo was generated.378var tm metadata.TorrentMeta379require.NoError(s.cas.GetCacheFileMetadata(blob.Digest.Hex(), &tm))380
381// Pushing again should be a no-op.382err = cp.Provide(master1).TransferBlob(blob.Digest, bytes.NewReader(blob.Content))383require.NoError(err)384ensureHasBlob(t, cp.Provide(master1), namespace, blob)385}
386
387func TestTransferBlobInvalidParam(t *testing.T) {388t.Run("StartInvalidDigest", func(t *testing.T) {389require := require.New(t)390
391cp := newTestClientProvider()392s := newTestServer(t, master1, hashRingMaxReplica(), cp)393defer s.cleanup()394
395_, err := httputil.Post(396fmt.Sprintf("http://%s/internal/blobs/foo/uploads", s.addr))397require.Error(err)398require.True(httputil.IsStatus(err, http.StatusBadRequest))399})400t.Run("PatchInvalidDigest", func(t *testing.T) {401require := require.New(t)402
403cp := newTestClientProvider()404s := newTestServer(t, master1, hashRingMaxReplica(), cp)405defer s.cleanup()406
407d := core.DigestFixture()408_, err := httputil.Post(409fmt.Sprintf("http://%s/internal/blobs/%s/uploads", s.addr, d.String()))410require.NoError(err)411_, err = httputil.Patch(412fmt.Sprintf("http://%s/internal/blobs/foo/uploads/bar", s.addr),413httputil.SendHeaders(map[string]string{414"Content-Range": fmt.Sprintf("%d-%d", 0, 0),415}))416require.Error(err)417require.True(httputil.IsStatus(err, http.StatusBadRequest))418})419t.Run("PatchNonexistentUploadUUID", func(t *testing.T) {420require := require.New(t)421
422cp := newTestClientProvider()423s := newTestServer(t, master1, hashRingMaxReplica(), cp)424defer s.cleanup()425
426d := core.DigestFixture()427_, err := httputil.Post(428fmt.Sprintf("http://%s/internal/blobs/%s/uploads", s.addr, d.String()))429require.NoError(err)430
431_, err = httputil.Patch(432fmt.Sprintf("http://%s/internal/blobs/%s/uploads/bar", s.addr, d.String()),433httputil.SendHeaders(map[string]string{434"Content-Range": fmt.Sprintf("%d-%d", 0, 0),435}))436require.Error(err)437require.True(httputil.IsStatus(err, http.StatusNotFound))438})439t.Run("CommitInvalidDigest", func(t *testing.T) {440require := require.New(t)441
442cp := newTestClientProvider()443s := newTestServer(t, master1, hashRingMaxReplica(), cp)444defer s.cleanup()445
446d := core.DigestFixture()447_, err := httputil.Post(448fmt.Sprintf("http://%s/internal/blobs/%s/uploads", s.addr, d.String()))449require.NoError(err)450
451_, err = httputil.Put(452fmt.Sprintf("http://%s/internal/blobs/foo/uploads/bar", s.addr))453require.Error(err)454require.True(httputil.IsStatus(err, http.StatusBadRequest))455})456t.Run("CommitNonexistentUploadUUID", func(t *testing.T) {457require := require.New(t)458
459cp := newTestClientProvider()460s := newTestServer(t, master1, hashRingMaxReplica(), cp)461defer s.cleanup()462
463d := core.DigestFixture()464_, err := httputil.Post(465fmt.Sprintf("http://%s/internal/blobs/%s/uploads", s.addr, d.String()))466require.NoError(err)467
468_, err = httputil.Put(469fmt.Sprintf("http://%s/internal/blobs/%s/uploads/bar", s.addr, d.String()))470require.Error(err)471fmt.Println(err)472require.True(httputil.IsStatus(err, http.StatusNotFound))473})474}
475
476func TestTransferBlobSmallChunkSize(t *testing.T) {477require := require.New(t)478
479s := newTestServer(t, master1, hashRingMaxReplica(), newTestClientProvider())480defer s.cleanup()481
482blob := core.SizedBlobFixture(1000, 1)483namespace := core.TagFixture()484
485client := blobclient.New(s.addr, blobclient.WithChunkSize(13))486
487err := client.TransferBlob(blob.Digest, bytes.NewReader(blob.Content))488require.NoError(err)489ensureHasBlob(t, client, namespace, blob)490}
491
492func TestOverwriteMetainfo(t *testing.T) {493require := require.New(t)494
495cp := newTestClientProvider()496
497s := newTestServer(t, master1, hashRingMaxReplica(), cp)498defer s.cleanup()499
500blob := core.NewBlobFixture()501namespace := core.TagFixture()502
503err := cp.Provide(master1).TransferBlob(blob.Digest, bytes.NewReader(blob.Content))504require.NoError(err)505
506mi, err := cp.Provide(master1).GetMetaInfo(namespace, blob.Digest)507require.NoError(err)508require.Equal(int64(4), mi.PieceLength())509
510err = cp.Provide(master1).OverwriteMetaInfo(blob.Digest, 16)511require.NoError(err)512
513mi, err = cp.Provide(master1).GetMetaInfo(namespace, blob.Digest)514require.NoError(err)515require.Equal(int64(16), mi.PieceLength())516}
517
518func TestReplicateToRemote(t *testing.T) {519require := require.New(t)520
521cp := newTestClientProvider()522
523s := newTestServer(t, master1, hashRingMaxReplica(), cp)524defer s.cleanup()525
526blob := core.NewBlobFixture()527namespace := core.TagFixture()528
529require.NoError(cp.Provide(master1).TransferBlob(blob.Digest, bytes.NewReader(blob.Content)))530
531remote := "remote:80"532
533remoteCluster := s.expectRemoteCluster(remote)534remoteCluster.EXPECT().UploadBlob(535namespace, blob.Digest, mockutil.MatchReader(blob.Content)).Return(nil)536
537require.NoError(cp.Provide(master1).ReplicateToRemote(namespace, blob.Digest, remote))538}
539
540func TestReplicateToRemoteInvalidParam(t *testing.T) {541digest := core.DigestFixture()542
543tests := []struct {544desc string545path string546status int547}{548{549"empty namespace",550fmt.Sprintf("namespace//blobs/%s/remote/bar", digest),551http.StatusBadRequest,552}, {553"invalid digest",554"namespace/hello/blobs/foo/remote/bar",555http.StatusBadRequest,556},557}558for _, test := range tests {559t.Run(test.desc, func(t *testing.T) {560require := require.New(t)561
562cp := newTestClientProvider()563
564s := newTestServer(t, master1, hashRingMaxReplica(), cp)565defer s.cleanup()566
567_, err := httputil.Post(fmt.Sprintf("http://%s/%s", s.addr, test.path))568require.Error(err)569require.True(httputil.IsStatus(err, test.status))570})571}572}
573
574func TestReplicateToRemoteWhenBlobInStorageBackend(t *testing.T) {575require := require.New(t)576
577cp := newTestClientProvider()578
579s := newTestServer(t, master1, hashRingMaxReplica(), cp)580defer s.cleanup()581
582blob := core.NewBlobFixture()583namespace := core.TagFixture()584
585backendClient := s.backendClient(namespace)586backendClient.EXPECT().Stat(namespace,587blob.Digest.Hex()).Return(core.NewBlobInfo(int64(len(blob.Content))), nil).AnyTimes()588backendClient.EXPECT().Download(namespace, blob.Digest.Hex(), mockutil.MatchWriter(blob.Content)).Return(nil)589
590remote := "remote:80"591
592remoteCluster := s.expectRemoteCluster(remote)593remoteCluster.EXPECT().UploadBlob(594namespace, blob.Digest, mockutil.MatchReader(blob.Content)).Return(nil)595
596require.NoError(testutil.PollUntilTrue(5*time.Second, func() bool {597err := cp.Provide(master1).ReplicateToRemote(namespace, blob.Digest, remote)598return !httputil.IsAccepted(err)599}))600}
601
602func TestUploadBlobDuplicatesWriteBackTaskToReplicas(t *testing.T) {603require := require.New(t)604
605ring := hashRingSomeReplica()606namespace := core.TagFixture()607
608cp := newTestClientProvider()609
610s1 := newTestServer(t, master1, ring, cp)611defer s1.cleanup()612
613s2 := newTestServer(t, master2, ring, cp)614defer s2.cleanup()615
616blob := computeBlobForHosts(ring, s1.host, s2.host)617
618s1.writeBackManager.EXPECT().Add(619writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 0))).Return(nil)620s2.writeBackManager.EXPECT().Add(621writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 30*time.Minute)))622
623err := cp.Provide(s1.host).UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content))624require.NoError(err)625
626ensureHasBlob(t, cp.Provide(s1.host), namespace, blob)627ensureHasBlob(t, cp.Provide(s2.host), namespace, blob)628
629// Shouldn't be able to delete blob since it is still being written back.630require.Error(cp.Provide(s1.host).DeleteBlob(blob.Digest))631require.Error(cp.Provide(s2.host).DeleteBlob(blob.Digest))632}
633
634func TestUploadBlobRetriesWriteBackFailure(t *testing.T) {635require := require.New(t)636
637ring := hashRingNoReplica()638namespace := core.TagFixture()639
640cp := newTestClientProvider()641
642s := newTestServer(t, master1, ring, cp)643defer s.cleanup()644
645blob := computeBlobForHosts(ring, s.host)646
647expectedTask := writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 0))648
649gomock.InOrder(650s.writeBackManager.EXPECT().Add(expectedTask).Return(errors.New("some error")),651s.writeBackManager.EXPECT().Add(expectedTask).Return(nil),652)653
654// Upload should "fail" because we failed to add a write-back task, but blob655// should still be present.656err := cp.Provide(s.host).UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content))657require.Error(err)658ensureHasBlob(t, cp.Provide(s.host), namespace, blob)659
660// Uploading again should succeed.661err = cp.Provide(s.host).UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content))662require.NoError(err)663
664// Shouldn't be able to delete blob since it is still being written back.665require.Error(cp.Provide(s.host).DeleteBlob(blob.Digest))666}
667
668func TestUploadBlobResilientToDuplicationFailure(t *testing.T) {669require := require.New(t)670
671ring := hashRingSomeReplica()672namespace := core.TagFixture()673
674cp := newTestClientProvider()675
676s := newTestServer(t, master1, ring, cp)677defer s.cleanup()678
679cp.register(master2, blobclient.New("localhost:0"))680
681blob := computeBlobForHosts(ring, s.host, master2)682
683s.writeBackManager.EXPECT().Add(684writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 0))).Return(nil)685
686err := cp.Provide(s.host).UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content))687require.NoError(err)688
689ensureHasBlob(t, cp.Provide(s.host), namespace, blob)690}
691
692func TestForceCleanupTTL(t *testing.T) {693require := require.New(t)694
695ring := hashRingNoReplica()696namespace := core.TagFixture()697
698cp := newTestClientProvider()699
700s := newTestServer(t, master1, ring, cp)701defer s.cleanup()702
703client := cp.Provide(s.host)704
705blob := computeBlobForHosts(ring, s.host)706
707s.writeBackManager.EXPECT().Add(708writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 0))).Return(nil)709
710require.NoError(client.UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content)))711
712ensureHasBlob(t, client, namespace, blob)713
714// Since the blob was just uploaded, it should not be deleted on force cleanup.715require.NoError(client.ForceCleanup(12 * time.Hour))716ensureHasBlob(t, client, namespace, blob)717
718s.clk.Add(14 * time.Hour)719
720s.writeBackManager.EXPECT().Find(writeback.NewNameQuery(blob.Digest.Hex())).Return(nil, nil)721
722require.NoError(client.ForceCleanup(12 * time.Hour))723
724_, err := client.StatLocal(namespace, blob.Digest)725require.Error(err)726require.Equal(blobclient.ErrBlobNotFound, err)727}
728
729func TestForceCleanupNonOwner(t *testing.T) {730require := require.New(t)731
732ring := hashRingNoReplica()733namespace := core.TagFixture()734
735cp := newTestClientProvider()736
737s1 := newTestServer(t, master1, ring, cp)738defer s1.cleanup()739
740s2 := newTestServer(t, master2, ring, cp)741defer s2.cleanup()742
743client := cp.Provide(s1.host)744
745// s1 does not own blob, but will still accept the upload. On ForceCleanup, it746// should be removed.747blob := computeBlobForHosts(ring, s2.host)748
749s1.writeBackManager.EXPECT().Add(750writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 0))).Return(nil)751
752s2.writeBackManager.EXPECT().Add(753writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 30*time.Minute)))754
755require.NoError(client.UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content)))756
757ensureHasBlob(t, client, namespace, blob)758
759s1.writeBackManager.EXPECT().Find(writeback.NewNameQuery(blob.Digest.Hex())).Return(nil, nil)760
761require.NoError(client.ForceCleanup(12 * time.Hour))762
763_, err := client.StatLocal(namespace, blob.Digest)764require.Error(err)765require.Equal(blobclient.ErrBlobNotFound, err)766}
767
768func TestForceCleanupWriteBackFailures(t *testing.T) {769require := require.New(t)770
771ring := hashRingNoReplica()772namespace := core.TagFixture()773
774cp := newTestClientProvider()775
776s := newTestServer(t, master1, ring, cp)777defer s.cleanup()778
779client := cp.Provide(s.host)780
781blob := computeBlobForHosts(ring, s.host)782
783task := writeback.NewTask(namespace, blob.Digest.Hex(), 0)784
785s.writeBackManager.EXPECT().Add(writeback.MatchTask(task)).Return(nil)786
787require.NoError(client.UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content)))788
789ensureHasBlob(t, client, namespace, blob)790
791s.clk.Add(14 * time.Hour)792
793// If there exists a writeback task, and it fails to manually execute it,794// the blob should not be deleted.795s.writeBackManager.EXPECT().Find(796writeback.NewNameQuery(blob.Digest.Hex())).Return([]persistedretry.Task{task}, nil)797
798s.writeBackManager.EXPECT().SyncExec(task).Return(errors.New("some error"))799
800require.NoError(client.ForceCleanup(12 * time.Hour))801
802ensureHasBlob(t, client, namespace, blob)803}
804