kraken

Форк
0
379 строк · 10.1 Кб
1
// Copyright (c) 2016-2019 Uber Technologies, Inc.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
package agentstorage
15

16
import (
17
	"errors"
18
	"fmt"
19
	"io/ioutil"
20
	"math"
21
	"sync"
22
	"testing"
23
	"time"
24

25
	"github.com/uber/kraken/core"
26
	"github.com/uber/kraken/lib/store"
27
	"github.com/uber/kraken/lib/store/metadata"
28
	"github.com/uber/kraken/lib/torrent/storage"
29
	"github.com/uber/kraken/lib/torrent/storage/piecereader"
30
	"github.com/uber/kraken/mocks/lib/store"
31
	"github.com/uber/kraken/utils/bitsetutil"
32

33
	"github.com/golang/mock/gomock"
34
	"github.com/stretchr/testify/require"
35
)
36

37
func prepareStore(cads *store.CADownloadStore, mi *core.MetaInfo) {
38
	if err := cads.CreateDownloadFile(mi.Digest().Hex(), mi.Length()); err != nil {
39
		panic(err)
40
	}
41
	if _, err := cads.Download().SetMetadata(mi.Digest().Hex(), metadata.NewTorrentMeta(mi)); err != nil {
42
		panic(err)
43
	}
44
}
45

46
func TestTorrentCreate(t *testing.T) {
47
	require := require.New(t)
48

49
	cads, cleanup := store.CADownloadStoreFixture()
50
	defer cleanup()
51

52
	mi := core.SizedBlobFixture(7, 2).MetaInfo
53

54
	prepareStore(cads, mi)
55

56
	tor, err := NewTorrent(cads, mi)
57
	require.NoError(err)
58

59
	// New torrent
60
	require.Equal(mi.Digest(), tor.Digest())
61
	require.Equal(4, tor.NumPieces())
62
	require.Equal(int64(7), tor.Length())
63
	require.Equal(int64(2), tor.PieceLength(0))
64
	require.Equal(int64(1), tor.PieceLength(3))
65
	require.Equal(mi.InfoHash(), tor.InfoHash())
66
	require.False(tor.Complete())
67
	require.Equal(int64(0), tor.BytesDownloaded())
68
	require.Equal(bitsetutil.FromBools(false, false, false, false), tor.Bitfield())
69
	require.False(tor.HasPiece(0))
70
	require.Equal([]int{0, 1, 2, 3}, tor.MissingPieces())
71
}
72

73
func TestTorrentWriteUpdatesBytesDownloadedAndBitfield(t *testing.T) {
74
	require := require.New(t)
75

76
	cads, cleanup := store.CADownloadStoreFixture()
77
	defer cleanup()
78

79
	blob := core.SizedBlobFixture(2, 1)
80

81
	prepareStore(cads, blob.MetaInfo)
82

83
	tor, err := NewTorrent(cads, blob.MetaInfo)
84
	require.NoError(err)
85

86
	require.NoError(tor.WritePiece(piecereader.NewBuffer(blob.Content[:1]), 0))
87
	require.False(tor.Complete())
88
	require.Equal(int64(1), tor.BytesDownloaded())
89
	require.Equal(bitsetutil.FromBools(true, false), tor.Bitfield())
90
}
91

92
func TestTorrentWriteComplete(t *testing.T) {
93
	require := require.New(t)
94

95
	cads, cleanup := store.CADownloadStoreFixture()
96
	defer cleanup()
97

98
	blob := core.SizedBlobFixture(1, 1)
99

100
	prepareStore(cads, blob.MetaInfo)
101

102
	tor, err := NewTorrent(cads, blob.MetaInfo)
103
	require.NoError(err)
104

105
	require.NoError(tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
106

107
	r, err := tor.GetPieceReader(0)
108
	require.NoError(err)
109
	defer r.Close()
110
	result, err := ioutil.ReadAll(r)
111
	require.NoError(err)
112
	require.Equal(blob.Content, result)
113

114
	require.True(tor.Complete())
115
	require.Equal(int64(1), tor.BytesDownloaded())
116

117
	// Duplicate write should detect piece is complete.
118
	require.Equal(storage.ErrPieceComplete, tor.WritePiece(piecereader.NewBuffer(blob.Content[:1]), 0))
119
}
120

121
func TestTorrentWriteMultiplePieceConcurrent(t *testing.T) {
122
	require := require.New(t)
123

124
	cads, cleanup := store.CADownloadStoreFixture()
125
	defer cleanup()
126

127
	blob := core.SizedBlobFixture(7, 2)
128

129
	prepareStore(cads, blob.MetaInfo)
130

131
	tor, err := NewTorrent(cads, blob.MetaInfo)
132
	require.NoError(err)
133

134
	wg := sync.WaitGroup{}
135
	wg.Add(tor.NumPieces())
136
	for i := 0; i < tor.NumPieces(); i++ {
137
		go func(i int) {
138
			defer wg.Done()
139
			start := i * int(blob.MetaInfo.PieceLength())
140
			end := start + int(tor.PieceLength(i))
141
			require.NoError(tor.WritePiece(piecereader.NewBuffer(blob.Content[start:end]), i))
142
		}(i)
143
	}
144

145
	wg.Wait()
146

147
	// Complete
148
	require.True(tor.Complete())
149
	require.Equal(int64(7), tor.BytesDownloaded())
150
	require.Nil(tor.MissingPieces())
151

152
	// Check content
153
	reader, err := cads.Cache().GetFileReader(blob.MetaInfo.Digest().Hex())
154
	require.NoError(err)
155
	torrentBytes, err := ioutil.ReadAll(reader)
156
	require.NoError(err)
157
	require.Equal(blob.Content, torrentBytes)
158
}
159

160
func TestTorrentWriteSamePieceConcurrent(t *testing.T) {
161
	require := require.New(t)
162

163
	cads, cleanup := store.CADownloadStoreFixture()
164
	defer cleanup()
165

166
	blob := core.SizedBlobFixture(16, 1)
167

168
	prepareStore(cads, blob.MetaInfo)
169

170
	tor, err := NewTorrent(cads, blob.MetaInfo)
171
	require.NoError(err)
172

173
	var wg sync.WaitGroup
174
	for i := 0; i < 32; i++ {
175
		wg.Add(1)
176
		go func(i int) {
177
			defer wg.Done()
178

179
			pi := int(math.Mod(float64(i), float64(len(blob.Content))))
180

181
			// If another goroutine is currently writing, we should get errWritePieceConflict.
182
			// If another goroutine has finished writing, we should get storage.ErrPieceComplete.
183
			err := tor.WritePiece(piecereader.NewBuffer([]byte{blob.Content[pi]}), pi)
184
			if err != nil {
185
				require.Contains([]error{errWritePieceConflict, storage.ErrPieceComplete}, err)
186
			}
187

188
			start := time.Now()
189
			timeout := time.Duration(100 * time.Millisecond)
190
			for {
191
				time.Sleep(5 * time.Millisecond)
192
				if time.Since(start) > timeout {
193
					require.FailNow(fmt.Sprintf("failed to get piece reader %v after writing", timeout))
194
				}
195

196
				// If another goroutine was writing when we tried to, we will get errPieceNotComplete
197
				// until they finish.
198
				r, err := tor.GetPieceReader(pi)
199
				if err != nil {
200
					require.Equal(errPieceNotComplete, err)
201
					continue
202
				}
203
				defer r.Close()
204

205
				result, err := ioutil.ReadAll(r)
206
				require.NoError(err)
207
				require.Equal(1, len(result))
208
				require.Equal(1, len(result))
209
				require.Equal(blob.Content[pi], result[0])
210

211
				return
212
			}
213
		}(i)
214
	}
215
	wg.Wait()
216

217
	reader, err := cads.Cache().GetFileReader(blob.MetaInfo.Digest().Hex())
218
	require.NoError(err)
219
	torrentBytes, err := ioutil.ReadAll(reader)
220
	require.NoError(err)
221
	require.Equal(blob.Content, torrentBytes)
222
}
223

224
// mockGetDownloadFileReadWriterStore wraps an internal CADownloadStore but
225
// overrides the GetDownloadFileReadWriter method to return f.
226
type mockGetDownloadFileReadWriterStore struct {
227
	*store.CADownloadStore
228
	f store.FileReadWriter
229
}
230

231
func (s *mockGetDownloadFileReadWriterStore) GetDownloadFileReadWriter(
232
	name string) (store.FileReadWriter, error) {
233

234
	return s.f, nil
235
}
236

237
// coordinatedWriter allows blocking WriteAt calls to simulate race conditions.
238
type coordinatedWriter struct {
239
	store.FileReadWriter
240
	startWriting chan bool
241
	stopWriting  chan bool
242
}
243

244
func newCoordinatedWriter(f store.FileReadWriter) *coordinatedWriter {
245
	return &coordinatedWriter{f, make(chan bool), make(chan bool)}
246
}
247

248
func (w *coordinatedWriter) Write(b []byte) (int, error) {
249
	w.startWriting <- true
250
	<-w.stopWriting
251
	return len(b), nil
252
}
253

254
func TestTorrentWritePieceConflictsDoNotBlock(t *testing.T) {
255
	require := require.New(t)
256

257
	blob := core.SizedBlobFixture(1, 1)
258

259
	f, cleanup := store.NewMockFileReadWriter([]byte{})
260
	defer cleanup()
261

262
	w := newCoordinatedWriter(f)
263

264
	cads, cleanup := store.CADownloadStoreFixture()
265
	defer cleanup()
266

267
	prepareStore(cads, blob.MetaInfo)
268

269
	mockCADS := &mockGetDownloadFileReadWriterStore{cads, w}
270

271
	tor, err := NewTorrent(mockCADS, blob.MetaInfo)
272
	require.NoError(err)
273

274
	done := make(chan struct{})
275
	go func() {
276
		defer close(done)
277
		require.NoError(tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
278
	}()
279

280
	// Writing while another goroutine is mid-write should not block.
281
	<-w.startWriting
282
	require.Equal(errWritePieceConflict, tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
283
	w.stopWriting <- true
284

285
	<-done
286

287
	// Duplicate write should detect piece is complete.
288
	require.Equal(storage.ErrPieceComplete, tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
289
}
290

291
func TestTorrentWritePieceFailuresRemoveDirtyStatus(t *testing.T) {
292
	require := require.New(t)
293

294
	ctrl := gomock.NewController(t)
295
	defer ctrl.Finish()
296

297
	w := mockstore.NewMockFileReadWriter(ctrl)
298

299
	cads, cleanup := store.CADownloadStoreFixture()
300
	defer cleanup()
301

302
	blob := core.SizedBlobFixture(1, 1)
303

304
	prepareStore(cads, blob.MetaInfo)
305

306
	mockCADS := &mockGetDownloadFileReadWriterStore{cads, w}
307

308
	gomock.InOrder(
309
		// First write fails.
310
		w.EXPECT().Seek(int64(0), 0).Return(int64(0), nil),
311
		w.EXPECT().Write(blob.Content).Return(0, errors.New("first write error")),
312
		w.EXPECT().Close().Return(nil),
313

314
		// Second write succeeds.
315
		w.EXPECT().Seek(int64(0), 0).Return(int64(0), nil),
316
		w.EXPECT().Write(blob.Content).Return(len(blob.Content), nil),
317
		w.EXPECT().Close().Return(nil),
318
	)
319

320
	tor, err := NewTorrent(mockCADS, blob.MetaInfo)
321
	require.NoError(err)
322

323
	// After the first write fails, the dirty bit should be flipped to empty,
324
	// allowing future writes to succeed.
325
	require.Error(tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
326
	require.NoError(tor.WritePiece(piecereader.NewBuffer(blob.Content), 0))
327
}
328

329
func TestTorrentRestoreCompletedTorrent(t *testing.T) {
330
	require := require.New(t)
331

332
	cads, cleanup := store.CADownloadStoreFixture()
333
	defer cleanup()
334

335
	blob := core.SizedBlobFixture(8, 1)
336

337
	prepareStore(cads, blob.MetaInfo)
338

339
	tor, err := NewTorrent(cads, blob.MetaInfo)
340
	require.NoError(err)
341

342
	for i, b := range blob.Content {
343
		require.NoError(tor.WritePiece(piecereader.NewBuffer([]byte{b}), i))
344
	}
345

346
	require.True(tor.Complete())
347

348
	tor, err = NewTorrent(cads, blob.MetaInfo)
349
	require.NoError(err)
350

351
	require.True(tor.Complete())
352
}
353

354
func TestTorrentRestoreInProgressTorrent(t *testing.T) {
355
	require := require.New(t)
356

357
	cads, cleanup := store.CADownloadStoreFixture()
358
	defer cleanup()
359

360
	blob := core.SizedBlobFixture(8, 1)
361

362
	prepareStore(cads, blob.MetaInfo)
363

364
	tor, err := NewTorrent(cads, blob.MetaInfo)
365
	require.NoError(err)
366

367
	pi := 4
368

369
	require.NoError(tor.WritePiece(piecereader.NewBuffer([]byte{blob.Content[pi]}), pi))
370
	require.Equal(int64(1), tor.BytesDownloaded())
371

372
	tor, err = NewTorrent(cads, blob.MetaInfo)
373
	require.NoError(err)
374

375
	require.Equal(int64(1), tor.BytesDownloaded())
376
	require.Equal(
377
		storage.ErrPieceComplete,
378
		tor.WritePiece(piecereader.NewBuffer([]byte{blob.Content[pi]}), pi))
379
}
380

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.