kraken
379 строк · 10.1 Кб
1// Copyright (c) 2016-2019 Uber Technologies, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14package agentstorage
15
16import (
17"errors"
18"fmt"
19"io/ioutil"
20"math"
21"sync"
22"testing"
23"time"
24
25"github.com/uber/kraken/core"
26"github.com/uber/kraken/lib/store"
27"github.com/uber/kraken/lib/store/metadata"
28"github.com/uber/kraken/lib/torrent/storage"
29"github.com/uber/kraken/lib/torrent/storage/piecereader"
30"github.com/uber/kraken/mocks/lib/store"
31"github.com/uber/kraken/utils/bitsetutil"
32
33"github.com/golang/mock/gomock"
34"github.com/stretchr/testify/require"
35)
36
37func prepareStore(cads *store.CADownloadStore, mi *core.MetaInfo) {
38if err := cads.CreateDownloadFile(mi.Digest().Hex(), mi.Length()); err != nil {
39panic(err)
40}
41if _, err := cads.Download().SetMetadata(mi.Digest().Hex(), metadata.NewTorrentMeta(mi)); err != nil {
42panic(err)
43}
44}
45
46func TestTorrentCreate(t *testing.T) {
47require := require.New(t)
48
49cads, cleanup := store.CADownloadStoreFixture()
50defer cleanup()
51
52mi := core.SizedBlobFixture(7, 2).MetaInfo
53
54prepareStore(cads, mi)
55
56tor, err := NewTorrent(cads, mi)
57require.NoError(err)
58
59// New torrent
60require.Equal(mi.Digest(), tor.Digest())
61require.Equal(4, tor.NumPieces())
62require.Equal(int64(7), tor.Length())
63require.Equal(int64(2), tor.PieceLength(0))
64require.Equal(int64(1), tor.PieceLength(3))
65require.Equal(mi.InfoHash(), tor.InfoHash())
66require.False(tor.Complete())
67require.Equal(int64(0), tor.BytesDownloaded())
68require.Equal(bitsetutil.FromBools(false, false, false, false), tor.Bitfield())
69require.False(tor.HasPiece(0))
70require.Equal([]int{0, 1, 2, 3}, tor.MissingPieces())
71}
72
73func TestTorrentWriteUpdatesBytesDownloadedAndBitfield(t *testing.T) {
74require := require.New(t)
75
76cads, cleanup := store.CADownloadStoreFixture()
77defer cleanup()
78
79blob := core.SizedBlobFixture(2, 1)
80
81prepareStore(cads, blob.MetaInfo)
82
83tor, err := NewTorrent(cads, blob.MetaInfo)
84require.NoError(err)
85
86require.NoError(tor.WritePiece(piecereader.NewBuffer(blob.Content[:1]), 0))
87require.False(tor.Complete())
88require.Equal(int64(1), tor.BytesDownloaded())
89require.Equal(bitsetutil.FromBools(true, false), tor.Bitfield())
90}
91
92func TestTorrentWriteComplete(t *testing.T) {
93require := require.New(t)
94
95cads, cleanup := store.CADownloadStoreFixture()
96defer cleanup()
97
98blob := core.SizedBlobFixture(1, 1)
99
100prepareStore(cads, blob.MetaInfo)
101
102tor, err := NewTorrent(cads, blob.MetaInfo)
103require.NoError(err)
104
105require.NoError(tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
106
107r, err := tor.GetPieceReader(0)
108require.NoError(err)
109defer r.Close()
110result, err := ioutil.ReadAll(r)
111require.NoError(err)
112require.Equal(blob.Content, result)
113
114require.True(tor.Complete())
115require.Equal(int64(1), tor.BytesDownloaded())
116
117// Duplicate write should detect piece is complete.
118require.Equal(storage.ErrPieceComplete, tor.WritePiece(piecereader.NewBuffer(blob.Content[:1]), 0))
119}
120
121func TestTorrentWriteMultiplePieceConcurrent(t *testing.T) {
122require := require.New(t)
123
124cads, cleanup := store.CADownloadStoreFixture()
125defer cleanup()
126
127blob := core.SizedBlobFixture(7, 2)
128
129prepareStore(cads, blob.MetaInfo)
130
131tor, err := NewTorrent(cads, blob.MetaInfo)
132require.NoError(err)
133
134wg := sync.WaitGroup{}
135wg.Add(tor.NumPieces())
136for i := 0; i < tor.NumPieces(); i++ {
137go func(i int) {
138defer wg.Done()
139start := i * int(blob.MetaInfo.PieceLength())
140end := start + int(tor.PieceLength(i))
141require.NoError(tor.WritePiece(piecereader.NewBuffer(blob.Content[start:end]), i))
142}(i)
143}
144
145wg.Wait()
146
147// Complete
148require.True(tor.Complete())
149require.Equal(int64(7), tor.BytesDownloaded())
150require.Nil(tor.MissingPieces())
151
152// Check content
153reader, err := cads.Cache().GetFileReader(blob.MetaInfo.Digest().Hex())
154require.NoError(err)
155torrentBytes, err := ioutil.ReadAll(reader)
156require.NoError(err)
157require.Equal(blob.Content, torrentBytes)
158}
159
160func TestTorrentWriteSamePieceConcurrent(t *testing.T) {
161require := require.New(t)
162
163cads, cleanup := store.CADownloadStoreFixture()
164defer cleanup()
165
166blob := core.SizedBlobFixture(16, 1)
167
168prepareStore(cads, blob.MetaInfo)
169
170tor, err := NewTorrent(cads, blob.MetaInfo)
171require.NoError(err)
172
173var wg sync.WaitGroup
174for i := 0; i < 32; i++ {
175wg.Add(1)
176go func(i int) {
177defer wg.Done()
178
179pi := int(math.Mod(float64(i), float64(len(blob.Content))))
180
181// If another goroutine is currently writing, we should get errWritePieceConflict.
182// If another goroutine has finished writing, we should get storage.ErrPieceComplete.
183err := tor.WritePiece(piecereader.NewBuffer([]byte{blob.Content[pi]}), pi)
184if err != nil {
185require.Contains([]error{errWritePieceConflict, storage.ErrPieceComplete}, err)
186}
187
188start := time.Now()
189timeout := time.Duration(100 * time.Millisecond)
190for {
191time.Sleep(5 * time.Millisecond)
192if time.Since(start) > timeout {
193require.FailNow(fmt.Sprintf("failed to get piece reader %v after writing", timeout))
194}
195
196// If another goroutine was writing when we tried to, we will get errPieceNotComplete
197// until they finish.
198r, err := tor.GetPieceReader(pi)
199if err != nil {
200require.Equal(errPieceNotComplete, err)
201continue
202}
203defer r.Close()
204
205result, err := ioutil.ReadAll(r)
206require.NoError(err)
207require.Equal(1, len(result))
208require.Equal(1, len(result))
209require.Equal(blob.Content[pi], result[0])
210
211return
212}
213}(i)
214}
215wg.Wait()
216
217reader, err := cads.Cache().GetFileReader(blob.MetaInfo.Digest().Hex())
218require.NoError(err)
219torrentBytes, err := ioutil.ReadAll(reader)
220require.NoError(err)
221require.Equal(blob.Content, torrentBytes)
222}
223
224// mockGetDownloadFileReadWriterStore wraps an internal CADownloadStore but
225// overrides the GetDownloadFileReadWriter method to return f.
226type mockGetDownloadFileReadWriterStore struct {
227*store.CADownloadStore
228f store.FileReadWriter
229}
230
231func (s *mockGetDownloadFileReadWriterStore) GetDownloadFileReadWriter(
232name string) (store.FileReadWriter, error) {
233
234return s.f, nil
235}
236
237// coordinatedWriter allows blocking WriteAt calls to simulate race conditions.
238type coordinatedWriter struct {
239store.FileReadWriter
240startWriting chan bool
241stopWriting chan bool
242}
243
244func newCoordinatedWriter(f store.FileReadWriter) *coordinatedWriter {
245return &coordinatedWriter{f, make(chan bool), make(chan bool)}
246}
247
248func (w *coordinatedWriter) Write(b []byte) (int, error) {
249w.startWriting <- true
250<-w.stopWriting
251return len(b), nil
252}
253
254func TestTorrentWritePieceConflictsDoNotBlock(t *testing.T) {
255require := require.New(t)
256
257blob := core.SizedBlobFixture(1, 1)
258
259f, cleanup := store.NewMockFileReadWriter([]byte{})
260defer cleanup()
261
262w := newCoordinatedWriter(f)
263
264cads, cleanup := store.CADownloadStoreFixture()
265defer cleanup()
266
267prepareStore(cads, blob.MetaInfo)
268
269mockCADS := &mockGetDownloadFileReadWriterStore{cads, w}
270
271tor, err := NewTorrent(mockCADS, blob.MetaInfo)
272require.NoError(err)
273
274done := make(chan struct{})
275go func() {
276defer close(done)
277require.NoError(tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
278}()
279
280// Writing while another goroutine is mid-write should not block.
281<-w.startWriting
282require.Equal(errWritePieceConflict, tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
283w.stopWriting <- true
284
285<-done
286
287// Duplicate write should detect piece is complete.
288require.Equal(storage.ErrPieceComplete, tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
289}
290
291func TestTorrentWritePieceFailuresRemoveDirtyStatus(t *testing.T) {
292require := require.New(t)
293
294ctrl := gomock.NewController(t)
295defer ctrl.Finish()
296
297w := mockstore.NewMockFileReadWriter(ctrl)
298
299cads, cleanup := store.CADownloadStoreFixture()
300defer cleanup()
301
302blob := core.SizedBlobFixture(1, 1)
303
304prepareStore(cads, blob.MetaInfo)
305
306mockCADS := &mockGetDownloadFileReadWriterStore{cads, w}
307
308gomock.InOrder(
309// First write fails.
310w.EXPECT().Seek(int64(0), 0).Return(int64(0), nil),
311w.EXPECT().Write(blob.Content).Return(0, errors.New("first write error")),
312w.EXPECT().Close().Return(nil),
313
314// Second write succeeds.
315w.EXPECT().Seek(int64(0), 0).Return(int64(0), nil),
316w.EXPECT().Write(blob.Content).Return(len(blob.Content), nil),
317w.EXPECT().Close().Return(nil),
318)
319
320tor, err := NewTorrent(mockCADS, blob.MetaInfo)
321require.NoError(err)
322
323// After the first write fails, the dirty bit should be flipped to empty,
324// allowing future writes to succeed.
325require.Error(tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
326require.NoError(tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
327}
328
329func TestTorrentRestoreCompletedTorrent(t *testing.T) {
330require := require.New(t)
331
332cads, cleanup := store.CADownloadStoreFixture()
333defer cleanup()
334
335blob := core.SizedBlobFixture(8, 1)
336
337prepareStore(cads, blob.MetaInfo)
338
339tor, err := NewTorrent(cads, blob.MetaInfo)
340require.NoError(err)
341
342for i, b := range blob.Content {
343require.NoError(tor.WritePiece(piecereader.NewBuffer([]byte{b}), i))
344}
345
346require.True(tor.Complete())
347
348tor, err = NewTorrent(cads, blob.MetaInfo)
349require.NoError(err)
350
351require.True(tor.Complete())
352}
353
354func TestTorrentRestoreInProgressTorrent(t *testing.T) {
355require := require.New(t)
356
357cads, cleanup := store.CADownloadStoreFixture()
358defer cleanup()
359
360blob := core.SizedBlobFixture(8, 1)
361
362prepareStore(cads, blob.MetaInfo)
363
364tor, err := NewTorrent(cads, blob.MetaInfo)
365require.NoError(err)
366
367pi := 4
368
369require.NoError(tor.WritePiece(piecereader.NewBuffer([]byte{blob.Content[pi]}), pi))
370require.Equal(int64(1), tor.BytesDownloaded())
371
372tor, err = NewTorrent(cads, blob.MetaInfo)
373require.NoError(err)
374
375require.Equal(int64(1), tor.BytesDownloaded())
376require.Equal(
377storage.ErrPieceComplete,
378tor.WritePiece(piecereader.NewBuffer([]byte{blob.Content[pi]}), pi))
379}
380