Dragonfly2

Форк
0
/
peertask_stream_resume_test.go 
282 строки · 7.9 Кб
1
/*
2
 *     Copyright 2023 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package peer
18

19
import (
20
	"context"
21
	"fmt"
22
	"io"
23
	"os"
24
	"sync"
25
	"testing"
26
	"time"
27

28
	"github.com/go-http-utils/headers"
29
	testifyassert "github.com/stretchr/testify/assert"
30
	"github.com/stretchr/testify/require"
31
	"go.uber.org/mock/gomock"
32
	"google.golang.org/grpc"
33
	"google.golang.org/grpc/credentials/insecure"
34

35
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
36
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
37
	schedulerv1mocks "d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"
38

39
	"d7y.io/dragonfly/v2/client/config"
40
	"d7y.io/dragonfly/v2/client/daemon/storage"
41
	"d7y.io/dragonfly/v2/client/daemon/test"
42
	"d7y.io/dragonfly/v2/client/util"
43
	"d7y.io/dragonfly/v2/internal/dferrors"
44
	"d7y.io/dragonfly/v2/pkg/net/http"
45
	schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
46
	clientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
47
	"d7y.io/dragonfly/v2/pkg/source"
48
	"d7y.io/dragonfly/v2/pkg/source/clients/httpprotocol"
49
	sourcemocks "d7y.io/dragonfly/v2/pkg/source/mocks"
50
)
51

52
func setupResumeStreamTaskComponents(ctrl *gomock.Controller, opt componentsOption) (
53
	schedulerclient.V1, storage.Manager) {
54
	// set up a scheduler to say back source only
55
	pps := schedulerv1mocks.NewMockScheduler_ReportPieceResultClient(ctrl)
56
	pps.EXPECT().Send(gomock.Any()).AnyTimes().DoAndReturn(
57
		func(pr *schedulerv1.PieceResult) error {
58
			return nil
59
		})
60
	pps.EXPECT().Recv().AnyTimes().DoAndReturn(
61
		func() (*schedulerv1.PeerPacket, error) {
62
			return nil, dferrors.New(commonv1.Code_SchedNeedBackSource, "")
63
		})
64
	pps.EXPECT().CloseSend().AnyTimes()
65

66
	sched := clientmocks.NewMockV1(ctrl)
67
	sched.EXPECT().RegisterPeerTask(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
68
		func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (*schedulerv1.RegisterResult, error) {
69
			return &schedulerv1.RegisterResult{
70
				TaskId:      opt.taskID,
71
				SizeScope:   commonv1.SizeScope_NORMAL,
72
				DirectPiece: nil,
73
			}, nil
74
		})
75
	sched.EXPECT().ReportPieceResult(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
76
		func(ctx context.Context, ptr *schedulerv1.PeerTaskRequest, opts ...grpc.CallOption) (schedulerv1.Scheduler_ReportPieceResultClient, error) {
77
			return pps, nil
78
		})
79
	sched.EXPECT().ReportPeerResult(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
80
		func(ctx context.Context, pr *schedulerv1.PeerResult, opts ...grpc.CallOption) error {
81
			return nil
82
		})
83

84
	// set up storage manager
85
	tempDir, _ := os.MkdirTemp("", "d7y-test-*")
86
	storageManager, _ := storage.NewStorageManager(
87
		config.SimpleLocalTaskStoreStrategy,
88
		&config.StorageOption{
89
			DataPath: tempDir,
90
			TaskExpireTime: util.Duration{
91
				Duration: -1 * time.Second,
92
			},
93
		}, func(request storage.CommonTaskRequest) {},
94
		os.FileMode(0700))
95
	return sched, storageManager
96
}
97

98
type intervalSleepReader struct {
99
	offset   int
100
	size     int
101
	data     []byte
102
	interval time.Duration
103
}
104

105
func (i *intervalSleepReader) Read(p []byte) (n int, err error) {
106
	if i.offset >= len(i.data) {
107
		return 0, io.EOF
108
	}
109
	end := i.offset + i.size
110
	if end > len(i.data) {
111
		end = len(i.data)
112
	}
113

114
	n = copy(p, i.data[i.offset:end])
115
	time.Sleep(i.interval)
116

117
	i.offset += n
118
	if i.offset >= len(i.data) {
119
		return n, io.EOF
120
	}
121
	return n, nil
122
}
123

124
func (i *intervalSleepReader) Close() error {
125
	return nil
126
}
127

128
func TestStreamPeerTask_Resume(t *testing.T) {
129
	assert := testifyassert.New(t)
130
	ctrl := gomock.NewController(t)
131

132
	testBytes, err := os.ReadFile(test.File)
133
	assert.Nil(err, "load test file")
134

135
	var (
136
		pieceParallelCount = int32(4)
137
		pieceSize          = 1024
138

139
		pieceDownloadInterval = time.Millisecond * 100
140

141
		mockContentLength = len(testBytes)
142
		//mockPieceCount    = int(math.Ceil(float64(mockContentLength) / float64(pieceSize)))
143

144
		peerID = "peer-resume-0"
145
		taskID = "task-resume-0"
146

147
		url = "http://localhost/test/data"
148
	)
149

150
	schedulerClient, storageManager := setupResumeStreamTaskComponents(
151
		ctrl,
152
		componentsOption{
153
			taskID:             taskID,
154
			contentLength:      int64(mockContentLength),
155
			pieceSize:          uint32(pieceSize),
156
			pieceParallelCount: pieceParallelCount,
157
			content:            testBytes,
158
		})
159
	defer storageManager.CleanUp()
160

161
	sourceClient := sourcemocks.NewMockResourceClient(ctrl)
162
	source.UnRegister("http")
163
	require.Nil(t, source.Register("http", sourceClient, httpprotocol.Adapter))
164
	defer source.UnRegister("http")
165
	sourceClient.EXPECT().Download(gomock.Any()).DoAndReturn(
166
		func(request *source.Request) (*source.Response, error) {
167
			response := source.NewResponse(
168
				&intervalSleepReader{
169
					size:     pieceSize,
170
					data:     testBytes,
171
					interval: pieceDownloadInterval,
172
				})
173
			response.ContentLength = int64(len(testBytes))
174
			return response, nil
175
		})
176

177
	pm := &pieceManager{
178
		calculateDigest: true,
179
		pieceDownloader: nil,
180
		computePieceSize: func(contentLength int64) uint32 {
181
			return uint32(pieceSize)
182
		},
183
	}
184
	ptm := &peerTaskManager{
185
		conductorLock:    &sync.Mutex{},
186
		runningPeerTasks: sync.Map{},
187
		trafficShaper:    NewTrafficShaper("plain", 0, nil),
188
		TaskManagerOption: TaskManagerOption{
189
			SchedulerClient: schedulerClient,
190
			TaskOption: TaskOption{
191
				CalculateDigest: true,
192
				PeerHost: &schedulerv1.PeerHost{
193
					Ip: "127.0.0.1",
194
				},
195
				PieceManager:   pm,
196
				StorageManager: storageManager,
197
				SchedulerOption: config.SchedulerOption{
198
					ScheduleTimeout: util.Duration{Duration: 10 * time.Minute},
199
				},
200
				GRPCDialTimeout: time.Second,
201
				GRPCCredentials: insecure.NewCredentials(),
202
			},
203
		},
204
	}
205
	req := &schedulerv1.PeerTaskRequest{
206
		Url: url,
207
		UrlMeta: &commonv1.UrlMeta{
208
			Tag: "d7y-test",
209
		},
210
		PeerId:   peerID,
211
		PeerHost: &schedulerv1.PeerHost{},
212
	}
213
	ctx := context.Background()
214
	wg := &sync.WaitGroup{}
215

216
	// set up parent task
217
	wg.Add(1)
218

219
	pt, err := ptm.newStreamTask(ctx, taskID, req, nil)
220
	assert.Nil(err, "new parent stream peer task")
221

222
	rc, _, err := pt.Start(ctx)
223
	assert.Nil(err, "start parent stream peer task")
224

225
	ptc := pt.peerTaskConductor
226

227
	go func() {
228
		outputBytes, err := io.ReadAll(rc)
229
		assert.Nil(err, "load read data")
230
		assert.Equal(testBytes, outputBytes, "output and desired output must match")
231
		wg.Done()
232
	}()
233

234
	ranges := []*http.Range{
235
		{
236
			Start:  0,
237
			Length: int64(mockContentLength),
238
		},
239
		{
240
			Start:  10,
241
			Length: int64(mockContentLength) - 10,
242
		},
243
		{
244
			Start:  100,
245
			Length: int64(mockContentLength) - 100,
246
		},
247
		{
248
			Start:  1000,
249
			Length: int64(mockContentLength) - 1000,
250
		},
251
		{
252
			Start:  1024,
253
			Length: int64(mockContentLength) - 1024,
254
		},
255
	}
256

257
	wg.Add(len(ranges))
258
	for _, rg := range ranges {
259
		go func(rg *http.Range) {
260
			pt := ptm.newResumeStreamTask(ctx, ptc, rg)
261
			assert.NotNil(pt, "new stream peer task")
262

263
			pt.computePieceSize = func(length int64) uint32 {
264
				return uint32(pieceSize)
265
			}
266

267
			rc, attr, err := pt.Start(ctx)
268
			assert.Nil(err, "start stream peer task")
269

270
			assert.Equal(attr[headers.ContentLength], fmt.Sprintf("%d", rg.Length), "content length should match")
271
			assert.Equal(attr[headers.ContentRange], fmt.Sprintf("bytes %d-%d/%d", rg.Start, mockContentLength-1, mockContentLength), "content length should match")
272

273
			outputBytes, err := io.ReadAll(rc)
274
			assert.Nil(err, "load read data")
275
			assert.Equal(len(testBytes[rg.Start:rg.Start+rg.Length]), len(outputBytes), "output and desired output length must match")
276
			assert.Equal(string(testBytes[rg.Start:rg.Start+rg.Length]), string(outputBytes), "output and desired output must match")
277
			wg.Done()
278
		}(rg)
279
	}
280

281
	wg.Wait()
282
}
283

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.