kraken

Форк
0
/
server_test.go 
803 строки · 21.7 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package blobserver
15

16
import (
17
	"bytes"
18
	"errors"
19
	"fmt"
20
	"io/ioutil"
21
	"net/http"
22
	"testing"
23
	"time"
24

25
	"github.com/golang/mock/gomock"
26
	"github.com/stretchr/testify/require"
27

28
	"github.com/uber/kraken/core"
29
	"github.com/uber/kraken/lib/backend/backenderrors"
30
	"github.com/uber/kraken/lib/persistedretry"
31
	"github.com/uber/kraken/lib/persistedretry/writeback"
32
	"github.com/uber/kraken/lib/store/metadata"
33
	"github.com/uber/kraken/origin/blobclient"
34
	"github.com/uber/kraken/utils/httputil"
35
	"github.com/uber/kraken/utils/mockutil"
36
	"github.com/uber/kraken/utils/testutil"
37
)
38

39
func TestHealth(t *testing.T) {
40
	require := require.New(t)
41

42
	cp := newTestClientProvider()
43

44
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
45
	defer s.cleanup()
46

47
	resp, err := httputil.Get(
48
		fmt.Sprintf("http://%s/health", s.addr))
49
	defer resp.Body.Close()
50
	require.NoError(err)
51
	b, err := ioutil.ReadAll(resp.Body)
52
	require.NoError(err)
53
	require.Equal("OK\n", string(b))
54
}
55

56
func TestStatHandlerLocalNotFound(t *testing.T) {
57
	require := require.New(t)
58

59
	cp := newTestClientProvider()
60

61
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
62
	defer s.cleanup()
63

64
	d := core.DigestFixture()
65
	namespace := core.TagFixture()
66

67
	_, err := cp.Provide(s.host).StatLocal(namespace, d)
68
	require.Equal(blobclient.ErrBlobNotFound, err)
69
}
70

71
func TestStatHandlerInvalidParam(t *testing.T) {
72
	digest := core.DigestFixture()
73

74
	tests := []struct {
75
		desc   string
76
		path   string
77
		status int
78
	}{
79
		{
80
			"empty namespace",
81
			fmt.Sprintf("internal/namespace//blobs/%s", digest),
82
			http.StatusBadRequest,
83
		}, {
84
			"invalid digest",
85
			"internal/namespace/foo/blobs/bar",
86
			http.StatusBadRequest,
87
		}, {
88
			"invalid local param",
89
			fmt.Sprintf("internal/namespace/foo/blobs/%s?local=bar", digest),
90
			http.StatusInternalServerError,
91
		},
92
	}
93
	for _, test := range tests {
94
		t.Run(test.desc, func(t *testing.T) {
95
			require := require.New(t)
96

97
			cp := newTestClientProvider()
98

99
			s := newTestServer(t, master1, hashRingMaxReplica(), cp)
100
			defer s.cleanup()
101

102
			_, err := httputil.Head(fmt.Sprintf("http://%s/%s", s.addr, test.path))
103
			require.Error(err)
104
			require.True(httputil.IsStatus(err, test.status))
105
		})
106
	}
107
}
108

109
func TestStatHandlerNotFound(t *testing.T) {
110
	require := require.New(t)
111

112
	cp := newTestClientProvider()
113

114
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
115
	defer s.cleanup()
116

117
	d := core.DigestFixture()
118
	namespace := core.TagFixture()
119

120
	backendClient := s.backendClient(namespace)
121

122
	backendClient.EXPECT().Stat(namespace, d.Hex()).Return(nil, backenderrors.ErrBlobNotFound)
123

124
	_, err := cp.Provide(master1).Stat(namespace, d)
125
	require.Equal(blobclient.ErrBlobNotFound, err)
126
}
127

128
func TestStatHandlerReturnSize(t *testing.T) {
129
	require := require.New(t)
130

131
	cp := newTestClientProvider()
132

133
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
134
	defer s.cleanup()
135

136
	client := cp.Provide(s.host)
137
	blob := core.SizedBlobFixture(256, 8)
138
	namespace := core.TagFixture()
139

140
	require.NoError(client.TransferBlob(blob.Digest, bytes.NewReader(blob.Content)))
141

142
	ensureHasBlob(t, cp.Provide(s.host), namespace, blob)
143

144
	bi, err := cp.Provide(master1).Stat(namespace, blob.Digest)
145
	require.NoError(err)
146
	require.NotNil(bi)
147
	require.Equal(int64(256), bi.Size)
148
}
149

150
func TestDownloadBlobInvalidParam(t *testing.T) {
151
	digest := core.DigestFixture()
152

153
	tests := []struct {
154
		desc   string
155
		path   string
156
		status int
157
	}{
158
		{
159
			"empty namespace",
160
			fmt.Sprintf("namespace//blobs/%s", digest),
161
			http.StatusBadRequest,
162
		}, {
163
			"invalid digest",
164
			"namespace/foo/blobs/bar",
165
			http.StatusBadRequest,
166
		},
167
	}
168
	for _, test := range tests {
169
		t.Run(test.desc, func(t *testing.T) {
170
			require := require.New(t)
171

172
			cp := newTestClientProvider()
173

174
			s := newTestServer(t, master1, hashRingMaxReplica(), cp)
175
			defer s.cleanup()
176

177
			_, err := httputil.Get(fmt.Sprintf("http://%s/%s", s.addr, test.path))
178
			require.Error(err)
179
			require.True(httputil.IsStatus(err, test.status))
180
		})
181
	}
182
}
183

184
func TestDownloadBlobNotFound(t *testing.T) {
185
	require := require.New(t)
186

187
	cp := newTestClientProvider()
188

189
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
190
	defer s.cleanup()
191

192
	d := core.DigestFixture()
193
	namespace := core.TagFixture()
194

195
	backendClient := s.backendClient(namespace)
196
	backendClient.EXPECT().Stat(namespace, d.Hex()).Return(nil, backenderrors.ErrBlobNotFound)
197

198
	err := cp.Provide(master1).DownloadBlob(namespace, d, ioutil.Discard)
199
	require.Error(err)
200
	require.Equal(http.StatusNotFound, err.(httputil.StatusError).Status)
201
}
202

203
func TestDeleteBlob(t *testing.T) {
204
	require := require.New(t)
205

206
	cp := newTestClientProvider()
207

208
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
209
	defer s.cleanup()
210

211
	client := cp.Provide(s.host)
212

213
	blob := core.NewBlobFixture()
214
	namespace := core.TagFixture()
215

216
	require.NoError(client.TransferBlob(blob.Digest, bytes.NewReader(blob.Content)))
217

218
	ensureHasBlob(t, cp.Provide(s.host), namespace, blob)
219

220
	require.NoError(client.DeleteBlob(blob.Digest))
221

222
	_, err := client.StatLocal(namespace, blob.Digest)
223
	require.Equal(blobclient.ErrBlobNotFound, err)
224
}
225

226
func TestDeleteBlobInvalidParam(t *testing.T) {
227
	require := require.New(t)
228

229
	cp := newTestClientProvider()
230

231
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
232
	defer s.cleanup()
233

234
	_, err := httputil.Delete(fmt.Sprintf("http://%s/internal/blobs/foo", s.addr))
235
	require.Error(err)
236
	require.True(httputil.IsStatus(err, http.StatusBadRequest))
237
}
238

239
func TestGetLocationsOK(t *testing.T) {
240
	require := require.New(t)
241

242
	cp := newTestClientProvider()
243
	ring := hashRingSomeReplica()
244

245
	s := newTestServer(t, master1, ring, cp)
246
	defer s.cleanup()
247

248
	blob := computeBlobForHosts(ring, master1, master2)
249

250
	locs, err := cp.Provide(s.host).Locations(blob.Digest)
251
	require.NoError(err)
252
	require.ElementsMatch([]string{master1, master2}, locs)
253
}
254

255
func TestGetPeerContextOK(t *testing.T) {
256
	require := require.New(t)
257

258
	cp := newTestClientProvider()
259

260
	s := newTestServer(t, master1, hashRingSomeReplica(), cp)
261
	defer s.cleanup()
262

263
	pctx, err := cp.Provide(master1).GetPeerContext()
264
	require.NoError(err)
265
	require.Equal(s.pctx, pctx)
266
}
267

268
func TestGetMetaInfoDownloadsBlobAndReplicates(t *testing.T) {
269
	require := require.New(t)
270

271
	ring := hashRingSomeReplica()
272
	cp := newTestClientProvider()
273
	namespace := core.TagFixture()
274

275
	s1 := newTestServer(t, master1, ring, cp)
276
	defer s1.cleanup()
277

278
	s2 := newTestServer(t, master2, ring, cp)
279
	defer s2.cleanup()
280

281
	blob := computeBlobForHosts(ring, s1.host, s2.host)
282

283
	backendClient := s1.backendClient(namespace)
284
	backendClient.EXPECT().Stat(namespace,
285
		blob.Digest.Hex()).Return(core.NewBlobInfo(int64(len(blob.Content))), nil).AnyTimes()
286
	backendClient.EXPECT().Download(namespace, blob.Digest.Hex(), mockutil.MatchWriter(blob.Content)).Return(nil)
287

288
	mi, err := cp.Provide(master1).GetMetaInfo(namespace, blob.Digest)
289
	require.True(httputil.IsAccepted(err))
290
	require.Nil(mi)
291

292
	require.NoError(testutil.PollUntilTrue(5*time.Second, func() bool {
293
		_, err := cp.Provide(master1).GetMetaInfo(namespace, blob.Digest)
294
		return !httputil.IsAccepted(err)
295
	}))
296

297
	mi, err = cp.Provide(master1).GetMetaInfo(namespace, blob.Digest)
298
	require.NoError(err)
299
	require.NotNil(mi)
300
	require.Equal(len(blob.Content), int(mi.Length()))
301

302
	// Ensure blob was replicated to other master.
303
	require.NoError(testutil.PollUntilTrue(5*time.Second, func() bool {
304
		_, err := cp.Provide(master2).StatLocal(namespace, blob.Digest)
305
		return err == nil
306
	}))
307
}
308

309
func TestGetMetaInfoBlobNotFound(t *testing.T) {
310
	require := require.New(t)
311

312
	cp := newTestClientProvider()
313

314
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
315
	defer s.cleanup()
316

317
	d := core.DigestFixture()
318
	namespace := core.TagFixture()
319

320
	backendClient := s.backendClient(namespace)
321
	backendClient.EXPECT().Stat(namespace, d.Hex()).Return(nil, backenderrors.ErrBlobNotFound)
322

323
	mi, err := cp.Provide(master1).GetMetaInfo(namespace, d)
324
	require.True(httputil.IsNotFound(err))
325
	require.Nil(mi)
326
}
327

328
func TestGetMetaInfoInvalidParam(t *testing.T) {
329
	digest := core.DigestFixture()
330

331
	tests := []struct {
332
		desc   string
333
		path   string
334
		status int
335
	}{
336
		{
337
			"empty namespace",
338
			fmt.Sprintf("internal/namespace//blobs/%s/metainfo", digest),
339
			http.StatusBadRequest,
340
		}, {
341
			"invalid digest",
342
			"internal/namespace/foo/blobs/bar/metainfo",
343
			http.StatusBadRequest,
344
		},
345
	}
346
	for _, test := range tests {
347
		t.Run(test.desc, func(t *testing.T) {
348
			require := require.New(t)
349

350
			cp := newTestClientProvider()
351

352
			s := newTestServer(t, master1, hashRingMaxReplica(), cp)
353
			defer s.cleanup()
354

355
			_, err := httputil.Get(fmt.Sprintf("http://%s/%s", s.addr, test.path))
356
			require.Error(err)
357
			require.True(httputil.IsStatus(err, test.status))
358
		})
359
	}
360
}
361

362
func TestTransferBlob(t *testing.T) {
363
	require := require.New(t)
364

365
	cp := newTestClientProvider()
366

367
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
368
	defer s.cleanup()
369

370
	blob := core.NewBlobFixture()
371
	namespace := core.TagFixture()
372

373
	err := cp.Provide(master1).TransferBlob(blob.Digest, bytes.NewReader(blob.Content))
374
	require.NoError(err)
375
	ensureHasBlob(t, cp.Provide(master1), namespace, blob)
376

377
	// Ensure metainfo was generated.
378
	var tm metadata.TorrentMeta
379
	require.NoError(s.cas.GetCacheFileMetadata(blob.Digest.Hex(), &tm))
380

381
	// Pushing again should be a no-op.
382
	err = cp.Provide(master1).TransferBlob(blob.Digest, bytes.NewReader(blob.Content))
383
	require.NoError(err)
384
	ensureHasBlob(t, cp.Provide(master1), namespace, blob)
385
}
386

387
func TestTransferBlobInvalidParam(t *testing.T) {
388
	t.Run("StartInvalidDigest", func(t *testing.T) {
389
		require := require.New(t)
390

391
		cp := newTestClientProvider()
392
		s := newTestServer(t, master1, hashRingMaxReplica(), cp)
393
		defer s.cleanup()
394

395
		_, err := httputil.Post(
396
			fmt.Sprintf("http://%s/internal/blobs/foo/uploads", s.addr))
397
		require.Error(err)
398
		require.True(httputil.IsStatus(err, http.StatusBadRequest))
399
	})
400
	t.Run("PatchInvalidDigest", func(t *testing.T) {
401
		require := require.New(t)
402

403
		cp := newTestClientProvider()
404
		s := newTestServer(t, master1, hashRingMaxReplica(), cp)
405
		defer s.cleanup()
406

407
		d := core.DigestFixture()
408
		_, err := httputil.Post(
409
			fmt.Sprintf("http://%s/internal/blobs/%s/uploads", s.addr, d.String()))
410
		require.NoError(err)
411
		_, err = httputil.Patch(
412
			fmt.Sprintf("http://%s/internal/blobs/foo/uploads/bar", s.addr),
413
			httputil.SendHeaders(map[string]string{
414
				"Content-Range": fmt.Sprintf("%d-%d", 0, 0),
415
			}))
416
		require.Error(err)
417
		require.True(httputil.IsStatus(err, http.StatusBadRequest))
418
	})
419
	t.Run("PatchNonexistentUploadUUID", func(t *testing.T) {
420
		require := require.New(t)
421

422
		cp := newTestClientProvider()
423
		s := newTestServer(t, master1, hashRingMaxReplica(), cp)
424
		defer s.cleanup()
425

426
		d := core.DigestFixture()
427
		_, err := httputil.Post(
428
			fmt.Sprintf("http://%s/internal/blobs/%s/uploads", s.addr, d.String()))
429
		require.NoError(err)
430

431
		_, err = httputil.Patch(
432
			fmt.Sprintf("http://%s/internal/blobs/%s/uploads/bar", s.addr, d.String()),
433
			httputil.SendHeaders(map[string]string{
434
				"Content-Range": fmt.Sprintf("%d-%d", 0, 0),
435
			}))
436
		require.Error(err)
437
		require.True(httputil.IsStatus(err, http.StatusNotFound))
438
	})
439
	t.Run("CommitInvalidDigest", func(t *testing.T) {
440
		require := require.New(t)
441

442
		cp := newTestClientProvider()
443
		s := newTestServer(t, master1, hashRingMaxReplica(), cp)
444
		defer s.cleanup()
445

446
		d := core.DigestFixture()
447
		_, err := httputil.Post(
448
			fmt.Sprintf("http://%s/internal/blobs/%s/uploads", s.addr, d.String()))
449
		require.NoError(err)
450

451
		_, err = httputil.Put(
452
			fmt.Sprintf("http://%s/internal/blobs/foo/uploads/bar", s.addr))
453
		require.Error(err)
454
		require.True(httputil.IsStatus(err, http.StatusBadRequest))
455
	})
456
	t.Run("CommitNonexistentUploadUUID", func(t *testing.T) {
457
		require := require.New(t)
458

459
		cp := newTestClientProvider()
460
		s := newTestServer(t, master1, hashRingMaxReplica(), cp)
461
		defer s.cleanup()
462

463
		d := core.DigestFixture()
464
		_, err := httputil.Post(
465
			fmt.Sprintf("http://%s/internal/blobs/%s/uploads", s.addr, d.String()))
466
		require.NoError(err)
467

468
		_, err = httputil.Put(
469
			fmt.Sprintf("http://%s/internal/blobs/%s/uploads/bar", s.addr, d.String()))
470
		require.Error(err)
471
		fmt.Println(err)
472
		require.True(httputil.IsStatus(err, http.StatusNotFound))
473
	})
474
}
475

476
func TestTransferBlobSmallChunkSize(t *testing.T) {
477
	require := require.New(t)
478

479
	s := newTestServer(t, master1, hashRingMaxReplica(), newTestClientProvider())
480
	defer s.cleanup()
481

482
	blob := core.SizedBlobFixture(1000, 1)
483
	namespace := core.TagFixture()
484

485
	client := blobclient.New(s.addr, blobclient.WithChunkSize(13))
486

487
	err := client.TransferBlob(blob.Digest, bytes.NewReader(blob.Content))
488
	require.NoError(err)
489
	ensureHasBlob(t, client, namespace, blob)
490
}
491

492
func TestOverwriteMetainfo(t *testing.T) {
493
	require := require.New(t)
494

495
	cp := newTestClientProvider()
496

497
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
498
	defer s.cleanup()
499

500
	blob := core.NewBlobFixture()
501
	namespace := core.TagFixture()
502

503
	err := cp.Provide(master1).TransferBlob(blob.Digest, bytes.NewReader(blob.Content))
504
	require.NoError(err)
505

506
	mi, err := cp.Provide(master1).GetMetaInfo(namespace, blob.Digest)
507
	require.NoError(err)
508
	require.Equal(int64(4), mi.PieceLength())
509

510
	err = cp.Provide(master1).OverwriteMetaInfo(blob.Digest, 16)
511
	require.NoError(err)
512

513
	mi, err = cp.Provide(master1).GetMetaInfo(namespace, blob.Digest)
514
	require.NoError(err)
515
	require.Equal(int64(16), mi.PieceLength())
516
}
517

518
func TestReplicateToRemote(t *testing.T) {
519
	require := require.New(t)
520

521
	cp := newTestClientProvider()
522

523
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
524
	defer s.cleanup()
525

526
	blob := core.NewBlobFixture()
527
	namespace := core.TagFixture()
528

529
	require.NoError(cp.Provide(master1).TransferBlob(blob.Digest, bytes.NewReader(blob.Content)))
530

531
	remote := "remote:80"
532

533
	remoteCluster := s.expectRemoteCluster(remote)
534
	remoteCluster.EXPECT().UploadBlob(
535
		namespace, blob.Digest, mockutil.MatchReader(blob.Content)).Return(nil)
536

537
	require.NoError(cp.Provide(master1).ReplicateToRemote(namespace, blob.Digest, remote))
538
}
539

540
func TestReplicateToRemoteInvalidParam(t *testing.T) {
541
	digest := core.DigestFixture()
542

543
	tests := []struct {
544
		desc   string
545
		path   string
546
		status int
547
	}{
548
		{
549
			"empty namespace",
550
			fmt.Sprintf("namespace//blobs/%s/remote/bar", digest),
551
			http.StatusBadRequest,
552
		}, {
553
			"invalid digest",
554
			"namespace/hello/blobs/foo/remote/bar",
555
			http.StatusBadRequest,
556
		},
557
	}
558
	for _, test := range tests {
559
		t.Run(test.desc, func(t *testing.T) {
560
			require := require.New(t)
561

562
			cp := newTestClientProvider()
563

564
			s := newTestServer(t, master1, hashRingMaxReplica(), cp)
565
			defer s.cleanup()
566

567
			_, err := httputil.Post(fmt.Sprintf("http://%s/%s", s.addr, test.path))
568
			require.Error(err)
569
			require.True(httputil.IsStatus(err, test.status))
570
		})
571
	}
572
}
573

574
func TestReplicateToRemoteWhenBlobInStorageBackend(t *testing.T) {
575
	require := require.New(t)
576

577
	cp := newTestClientProvider()
578

579
	s := newTestServer(t, master1, hashRingMaxReplica(), cp)
580
	defer s.cleanup()
581

582
	blob := core.NewBlobFixture()
583
	namespace := core.TagFixture()
584

585
	backendClient := s.backendClient(namespace)
586
	backendClient.EXPECT().Stat(namespace,
587
		blob.Digest.Hex()).Return(core.NewBlobInfo(int64(len(blob.Content))), nil).AnyTimes()
588
	backendClient.EXPECT().Download(namespace, blob.Digest.Hex(), mockutil.MatchWriter(blob.Content)).Return(nil)
589

590
	remote := "remote:80"
591

592
	remoteCluster := s.expectRemoteCluster(remote)
593
	remoteCluster.EXPECT().UploadBlob(
594
		namespace, blob.Digest, mockutil.MatchReader(blob.Content)).Return(nil)
595

596
	require.NoError(testutil.PollUntilTrue(5*time.Second, func() bool {
597
		err := cp.Provide(master1).ReplicateToRemote(namespace, blob.Digest, remote)
598
		return !httputil.IsAccepted(err)
599
	}))
600
}
601

602
func TestUploadBlobDuplicatesWriteBackTaskToReplicas(t *testing.T) {
603
	require := require.New(t)
604

605
	ring := hashRingSomeReplica()
606
	namespace := core.TagFixture()
607

608
	cp := newTestClientProvider()
609

610
	s1 := newTestServer(t, master1, ring, cp)
611
	defer s1.cleanup()
612

613
	s2 := newTestServer(t, master2, ring, cp)
614
	defer s2.cleanup()
615

616
	blob := computeBlobForHosts(ring, s1.host, s2.host)
617

618
	s1.writeBackManager.EXPECT().Add(
619
		writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 0))).Return(nil)
620
	s2.writeBackManager.EXPECT().Add(
621
		writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 30*time.Minute)))
622

623
	err := cp.Provide(s1.host).UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content))
624
	require.NoError(err)
625

626
	ensureHasBlob(t, cp.Provide(s1.host), namespace, blob)
627
	ensureHasBlob(t, cp.Provide(s2.host), namespace, blob)
628

629
	// Shouldn't be able to delete blob since it is still being written back.
630
	require.Error(cp.Provide(s1.host).DeleteBlob(blob.Digest))
631
	require.Error(cp.Provide(s2.host).DeleteBlob(blob.Digest))
632
}
633

634
func TestUploadBlobRetriesWriteBackFailure(t *testing.T) {
635
	require := require.New(t)
636

637
	ring := hashRingNoReplica()
638
	namespace := core.TagFixture()
639

640
	cp := newTestClientProvider()
641

642
	s := newTestServer(t, master1, ring, cp)
643
	defer s.cleanup()
644

645
	blob := computeBlobForHosts(ring, s.host)
646

647
	expectedTask := writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 0))
648

649
	gomock.InOrder(
650
		s.writeBackManager.EXPECT().Add(expectedTask).Return(errors.New("some error")),
651
		s.writeBackManager.EXPECT().Add(expectedTask).Return(nil),
652
	)
653

654
	// Upload should "fail" because we failed to add a write-back task, but blob
655
	// should still be present.
656
	err := cp.Provide(s.host).UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content))
657
	require.Error(err)
658
	ensureHasBlob(t, cp.Provide(s.host), namespace, blob)
659

660
	// Uploading again should succeed.
661
	err = cp.Provide(s.host).UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content))
662
	require.NoError(err)
663

664
	// Shouldn't be able to delete blob since it is still being written back.
665
	require.Error(cp.Provide(s.host).DeleteBlob(blob.Digest))
666
}
667

668
func TestUploadBlobResilientToDuplicationFailure(t *testing.T) {
669
	require := require.New(t)
670

671
	ring := hashRingSomeReplica()
672
	namespace := core.TagFixture()
673

674
	cp := newTestClientProvider()
675

676
	s := newTestServer(t, master1, ring, cp)
677
	defer s.cleanup()
678

679
	cp.register(master2, blobclient.New("localhost:0"))
680

681
	blob := computeBlobForHosts(ring, s.host, master2)
682

683
	s.writeBackManager.EXPECT().Add(
684
		writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 0))).Return(nil)
685

686
	err := cp.Provide(s.host).UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content))
687
	require.NoError(err)
688

689
	ensureHasBlob(t, cp.Provide(s.host), namespace, blob)
690
}
691

692
func TestForceCleanupTTL(t *testing.T) {
693
	require := require.New(t)
694

695
	ring := hashRingNoReplica()
696
	namespace := core.TagFixture()
697

698
	cp := newTestClientProvider()
699

700
	s := newTestServer(t, master1, ring, cp)
701
	defer s.cleanup()
702

703
	client := cp.Provide(s.host)
704

705
	blob := computeBlobForHosts(ring, s.host)
706

707
	s.writeBackManager.EXPECT().Add(
708
		writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 0))).Return(nil)
709

710
	require.NoError(client.UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content)))
711

712
	ensureHasBlob(t, client, namespace, blob)
713

714
	// Since the blob was just uploaded, it should not be deleted on force cleanup.
715
	require.NoError(client.ForceCleanup(12 * time.Hour))
716
	ensureHasBlob(t, client, namespace, blob)
717

718
	s.clk.Add(14 * time.Hour)
719

720
	s.writeBackManager.EXPECT().Find(writeback.NewNameQuery(blob.Digest.Hex())).Return(nil, nil)
721

722
	require.NoError(client.ForceCleanup(12 * time.Hour))
723

724
	_, err := client.StatLocal(namespace, blob.Digest)
725
	require.Error(err)
726
	require.Equal(blobclient.ErrBlobNotFound, err)
727
}
728

729
func TestForceCleanupNonOwner(t *testing.T) {
730
	require := require.New(t)
731

732
	ring := hashRingNoReplica()
733
	namespace := core.TagFixture()
734

735
	cp := newTestClientProvider()
736

737
	s1 := newTestServer(t, master1, ring, cp)
738
	defer s1.cleanup()
739

740
	s2 := newTestServer(t, master2, ring, cp)
741
	defer s2.cleanup()
742

743
	client := cp.Provide(s1.host)
744

745
	// s1 does not own blob, but will still accept the upload. On ForceCleanup, it
746
	// should be removed.
747
	blob := computeBlobForHosts(ring, s2.host)
748

749
	s1.writeBackManager.EXPECT().Add(
750
		writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 0))).Return(nil)
751

752
	s2.writeBackManager.EXPECT().Add(
753
		writeback.MatchTask(writeback.NewTask(namespace, blob.Digest.Hex(), 30*time.Minute)))
754

755
	require.NoError(client.UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content)))
756

757
	ensureHasBlob(t, client, namespace, blob)
758

759
	s1.writeBackManager.EXPECT().Find(writeback.NewNameQuery(blob.Digest.Hex())).Return(nil, nil)
760

761
	require.NoError(client.ForceCleanup(12 * time.Hour))
762

763
	_, err := client.StatLocal(namespace, blob.Digest)
764
	require.Error(err)
765
	require.Equal(blobclient.ErrBlobNotFound, err)
766
}
767

768
func TestForceCleanupWriteBackFailures(t *testing.T) {
769
	require := require.New(t)
770

771
	ring := hashRingNoReplica()
772
	namespace := core.TagFixture()
773

774
	cp := newTestClientProvider()
775

776
	s := newTestServer(t, master1, ring, cp)
777
	defer s.cleanup()
778

779
	client := cp.Provide(s.host)
780

781
	blob := computeBlobForHosts(ring, s.host)
782

783
	task := writeback.NewTask(namespace, blob.Digest.Hex(), 0)
784

785
	s.writeBackManager.EXPECT().Add(writeback.MatchTask(task)).Return(nil)
786

787
	require.NoError(client.UploadBlob(namespace, blob.Digest, bytes.NewReader(blob.Content)))
788

789
	ensureHasBlob(t, client, namespace, blob)
790

791
	s.clk.Add(14 * time.Hour)
792

793
	// If there exists a writeback task, and it fails to manually execute it,
794
	// the blob should not be deleted.
795
	s.writeBackManager.EXPECT().Find(
796
		writeback.NewNameQuery(blob.Digest.Hex())).Return([]persistedretry.Task{task}, nil)
797

798
	s.writeBackManager.EXPECT().SyncExec(task).Return(errors.New("some error"))
799

800
	require.NoError(client.ForceCleanup(12 * time.Hour))
801

802
	ensureHasBlob(t, client, namespace, blob)
803
}
804

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.