Dragonfly2

Форк
0
/
peertask_manager.go 
505 строк · 14.1 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
//go:generate mockgen -destination peertask_manager_mock.go -source peertask_manager.go -package peer
18

19
package peer
20

21
import (
22
	"context"
23
	"errors"
24
	"fmt"
25
	"io"
26
	"sync"
27

28
	"github.com/go-http-utils/headers"
29
	"go.opentelemetry.io/otel"
30
	"go.opentelemetry.io/otel/trace"
31
	"golang.org/x/time/rate"
32
	"google.golang.org/grpc/status"
33

34
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
35
	schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
36

37
	"d7y.io/dragonfly/v2/client/daemon/metrics"
38
	"d7y.io/dragonfly/v2/client/daemon/storage"
39
	logger "d7y.io/dragonfly/v2/internal/dflog"
40
	"d7y.io/dragonfly/v2/internal/util"
41
	"d7y.io/dragonfly/v2/pkg/idgen"
42
	nethttp "d7y.io/dragonfly/v2/pkg/net/http"
43
	schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
44
)
45

46
// TaskManager processes all peer tasks request
47
type TaskManager interface {
48
	// StartFileTask starts a peer task to download a file
49
	// return a progress channel for request download progress
50
	// tiny stands task file is tiny and task is done
51
	StartFileTask(ctx context.Context, req *FileTaskRequest) (
52
		progress chan *FileTaskProgress, err error)
53
	// StartStreamTask starts a peer task with stream io
54
	StartStreamTask(ctx context.Context, req *StreamTaskRequest) (
55
		readCloser io.ReadCloser, attribute map[string]string, err error)
56
	// StartSeedTask starts a seed peer task
57
	StartSeedTask(ctx context.Context, req *SeedTaskRequest) (
58
		seedTaskResult *SeedTaskResponse, reuse bool, err error)
59

60
	Subscribe(request *commonv1.PieceTaskRequest) (*SubscribeResponse, bool)
61

62
	IsPeerTaskRunning(taskID string, peerID string) (Task, bool)
63

64
	// StatTask checks whether the given task exists in P2P network
65
	StatTask(ctx context.Context, taskID string) (*schedulerv1.Task, error)
66

67
	// AnnouncePeerTask announces peer task info to P2P network
68
	AnnouncePeerTask(ctx context.Context, meta storage.PeerTaskMetadata, url string, taskType commonv1.TaskType, urlMeta *commonv1.UrlMeta) error
69

70
	GetPieceManager() PieceManager
71

72
	// Stop stops the PeerTaskManager
73
	Stop(ctx context.Context) error
74
}
75

76
// Task represents common interface to operate a peer task
77
type Task interface {
78
	Logger
79
	Context() context.Context
80
	Log() *logger.SugaredLoggerOnWith
81

82
	GetStorage() storage.TaskStorageDriver
83

84
	GetPeerID() string
85
	GetTaskID() string
86

87
	GetTotalPieces() int32
88
	SetTotalPieces(int32)
89

90
	GetContentLength() int64
91
	SetContentLength(int64)
92

93
	AddTraffic(uint64)
94
	GetTraffic() uint64
95

96
	SetPieceMd5Sign(string)
97
	GetPieceMd5Sign() string
98

99
	PublishPieceInfo(pieceNum int32, size uint32)
100
	ReportPieceResult(request *DownloadPieceRequest, result *DownloadPieceResult, err error)
101

102
	UpdateSourceErrorStatus(st *status.Status)
103
}
104

105
type Logger interface {
106
	Log() *logger.SugaredLoggerOnWith
107
}
108

109
type TinyData struct {
110
	TaskID  string
111
	PeerID  string
112
	Content []byte
113
}
114

115
var tracer trace.Tracer
116

117
func init() {
118
	tracer = otel.Tracer("dfget-daemon")
119
}
120

121
type peerTaskManager struct {
122
	TaskManagerOption
123
	conductorLock    sync.Locker
124
	runningPeerTasks sync.Map
125
	trafficShaper    TrafficShaper
126
}
127

128
type TaskManagerOption struct {
129
	TaskOption
130
	SchedulerClient   schedulerclient.V1
131
	PerPeerRateLimit  rate.Limit
132
	TotalRateLimit    rate.Limit
133
	TrafficShaperType string
134
	// Multiplex indicates to reuse the data of completed peer tasks
135
	Multiplex bool
136
	// Prefetch indicates to prefetch the whole files of ranged requests
137
	Prefetch          bool
138
	GetPiecesMaxRetry int
139
	SplitRunningTasks bool
140
}
141

142
func NewPeerTaskManager(opt *TaskManagerOption) (TaskManager, error) {
143
	ptm := &peerTaskManager{
144
		TaskManagerOption: *opt,
145
		runningPeerTasks:  sync.Map{},
146
		conductorLock:     &sync.Mutex{},
147
		trafficShaper:     NewTrafficShaper(opt.TrafficShaperType, opt.TotalRateLimit, util.ComputePieceSize),
148
	}
149
	ptm.trafficShaper.Start()
150
	return ptm, nil
151
}
152

153
func (ptm *peerTaskManager) findPeerTaskConductor(key string) (*peerTaskConductor, bool) {
154
	pt, ok := ptm.runningPeerTasks.Load(key)
155
	if !ok {
156
		return nil, false
157
	}
158
	return pt.(*peerTaskConductor), true
159
}
160

161
func (ptm *peerTaskManager) getPeerTaskConductor(ctx context.Context,
162
	taskID string,
163
	request *schedulerv1.PeerTaskRequest,
164
	limit rate.Limit,
165
	parent *peerTaskConductor,
166
	rg *nethttp.Range,
167
	desiredLocation string,
168
	seed bool) (*peerTaskConductor, error) {
169
	var (
170
		ptc     *peerTaskConductor
171
		created bool
172
		err     error
173
	)
174

175
	if ptm.SplitRunningTasks {
176
		ptc, created, err = ptm.createSplitedPeerTaskConductor(
177
			ctx, taskID, request, limit, parent, rg, desiredLocation, seed)
178
	} else {
179
		ptc, created, err = ptm.getOrCreatePeerTaskConductor(
180
			ctx, taskID, request, limit, parent, rg, desiredLocation, seed)
181
	}
182

183
	if err != nil {
184
		return nil, err
185
	}
186

187
	if created {
188
		if err = ptc.start(); err != nil {
189
			return nil, err
190
		}
191
	}
192
	return ptc, err
193
}
194

195
// getOrCreatePeerTaskConductor will get or create a peerTaskConductor,
196
// if created, return (ptc, true, nil), otherwise return (ptc, false, nil)
197
func (ptm *peerTaskManager) getOrCreatePeerTaskConductor(
198
	ctx context.Context,
199
	taskID string,
200
	request *schedulerv1.PeerTaskRequest,
201
	limit rate.Limit,
202
	parent *peerTaskConductor,
203
	rg *nethttp.Range,
204
	desiredLocation string,
205
	seed bool) (*peerTaskConductor, bool, error) {
206
	if ptc, ok := ptm.findPeerTaskConductor(taskID); ok {
207
		logger.Debugf("peer task found: %s/%s", ptc.taskID, ptc.peerID)
208
		return ptc, false, nil
209
	}
210
	ptc := ptm.newPeerTaskConductor(ctx, request, limit, parent, rg, seed)
211

212
	ptm.conductorLock.Lock()
213
	// double check
214
	if p, ok := ptm.findPeerTaskConductor(taskID); ok {
215
		ptm.conductorLock.Unlock()
216
		logger.Debugf("peer task found: %s/%s", p.taskID, p.peerID)
217
		if seed && !p.seed && !p.needBackSource.Load() {
218
			p.Warnf("new seed request received, switch to back source, may be produced by multiple schedulers")
219
			p.markBackSource()
220
		}
221
		metrics.PeerTaskCacheHitCount.Add(1)
222
		return p, false, nil
223
	}
224
	ptm.runningPeerTasks.Store(taskID, ptc)
225
	ptm.conductorLock.Unlock()
226
	metrics.PeerTaskCount.Add(1)
227
	logger.Debugf("peer task created: %s/%s", ptc.taskID, ptc.peerID)
228

229
	err := ptc.initStorage(desiredLocation)
230
	if err != nil {
231
		ptc.Errorf("init storage error: %s", err)
232
		ptc.cancelNotRegisterred(commonv1.Code_ClientError, err.Error())
233
		return nil, false, err
234
	}
235
	return ptc, true, nil
236
}
237

238
func (ptm *peerTaskManager) createSplitedPeerTaskConductor(
239
	ctx context.Context,
240
	taskID string,
241
	request *schedulerv1.PeerTaskRequest,
242
	limit rate.Limit,
243
	parent *peerTaskConductor,
244
	rg *nethttp.Range,
245
	desiredLocation string,
246
	seed bool) (*peerTaskConductor, bool, error) {
247
	ptc := ptm.newPeerTaskConductor(ctx, request, limit, parent, rg, seed)
248

249
	ptm.runningPeerTasks.Store(taskID+"/"+ptc.peerID, ptc)
250
	metrics.PeerTaskCount.Add(1)
251
	logger.Debugf("standalone peer task created: %s/%s", ptc.taskID, ptc.peerID)
252

253
	err := ptc.initStorage(desiredLocation)
254
	if err != nil {
255
		ptc.Errorf("init storage error: %s", err)
256
		ptc.cancelNotRegisterred(commonv1.Code_ClientError, err.Error())
257
		return nil, false, err
258
	}
259
	return ptc, true, nil
260
}
261

262
func (ptm *peerTaskManager) enabledPrefetch(rg *nethttp.Range) bool {
263
	return ptm.Prefetch && rg != nil
264
}
265

266
func (ptm *peerTaskManager) prefetchParentTask(request *schedulerv1.PeerTaskRequest, desiredLocation string) *peerTaskConductor {
267
	req := &schedulerv1.PeerTaskRequest{
268
		Url:         request.Url,
269
		PeerId:      request.PeerId,
270
		PeerHost:    ptm.PeerHost,
271
		IsMigrating: request.IsMigrating,
272
		UrlMeta: &commonv1.UrlMeta{
273
			Digest: request.UrlMeta.Digest,
274
			Tag:    request.UrlMeta.Tag,
275
			Filter: request.UrlMeta.Filter,
276
			Header: map[string]string{},
277
		},
278
	}
279
	for k, v := range request.UrlMeta.Header {
280
		if k == headers.Range {
281
			continue
282
		}
283
		req.UrlMeta.Header[k] = v
284
	}
285
	taskID := idgen.TaskIDV1(req.Url, req.UrlMeta)
286
	req.PeerId = idgen.PeerIDV1(req.PeerHost.Ip)
287

288
	var limit = rate.Inf
289
	if ptm.PerPeerRateLimit > 0 {
290
		limit = ptm.PerPeerRateLimit
291
	}
292

293
	logger.Infof("prefetch peer task %s/%s", taskID, req.PeerId)
294
	prefetch, err := ptm.getPeerTaskConductor(context.Background(), taskID, req, limit, nil, nil, desiredLocation, false)
295
	if err != nil {
296
		logger.Errorf("prefetch peer task %s error: %s", taskID, err)
297
		return nil
298
	}
299

300
	if prefetch != nil && prefetch.peerID == req.PeerId {
301
		metrics.PrefetchTaskCount.Add(1)
302
	}
303
	return prefetch
304
}
305

306
func (ptm *peerTaskManager) StartFileTask(ctx context.Context, req *FileTaskRequest) (chan *FileTaskProgress, error) {
307
	if req.KeepOriginalOffset && !ptm.Prefetch {
308
		return nil, fmt.Errorf("please enable prefetch when use original offset feature")
309
	}
310
	if ptm.Multiplex {
311
		progress, ok := ptm.tryReuseFilePeerTask(ctx, req)
312
		if ok {
313
			metrics.PeerTaskCacheHitCount.Add(1)
314
			return progress, nil
315
		}
316
	}
317
	// TODO ensure scheduler is ok first
318
	var limit = rate.Inf
319
	if ptm.PerPeerRateLimit > 0 {
320
		limit = ptm.PerPeerRateLimit
321
	}
322
	if req.Limit > 0 {
323
		limit = rate.Limit(req.Limit)
324
	}
325
	ctx, pt, err := ptm.newFileTask(ctx, req, limit)
326
	if err != nil {
327
		return nil, err
328
	}
329

330
	// FIXME when failed due to SchedulerClient error, relocate SchedulerClient and retry
331
	progress, err := pt.Start(ctx)
332
	return progress, err
333
}
334

335
func (ptm *peerTaskManager) StartStreamTask(ctx context.Context, req *StreamTaskRequest) (io.ReadCloser, map[string]string, error) {
336
	peerTaskRequest := &schedulerv1.PeerTaskRequest{
337
		Url:         req.URL,
338
		UrlMeta:     req.URLMeta,
339
		PeerId:      req.PeerID,
340
		PeerHost:    ptm.PeerHost,
341
		IsMigrating: false,
342
	}
343

344
	taskID := idgen.TaskIDV1(req.URL, req.URLMeta)
345
	if ptm.Multiplex {
346
		// try breakpoint resume for task has range header
347
		if req.Range != nil && !ptm.SplitRunningTasks {
348
			// find running parent task
349
			parentTaskID := idgen.ParentTaskIDV1(req.URL, req.URLMeta)
350
			parentTask, ok := ptm.findPeerTaskConductor(parentTaskID)
351
			if ok && parentTask.GetContentLength() > 0 {
352
				// only allow resume for range from breakpoint to end
353
				if req.Range.Start+req.Range.Length == parentTask.GetContentLength() {
354
					pt := ptm.newResumeStreamTask(ctx, parentTask, req.Range)
355
					return pt.Start(ctx)
356
				}
357
			}
358
		}
359

360
		// reuse by completed task
361
		r, attr, ok := ptm.tryReuseStreamPeerTask(ctx, taskID, req)
362
		if ok {
363
			metrics.PeerTaskCacheHitCount.Add(1)
364
			return r, attr, nil
365
		}
366
	}
367

368
	pt, err := ptm.newStreamTask(ctx, taskID, peerTaskRequest, req.Range)
369
	if err != nil {
370
		return nil, nil, err
371
	}
372

373
	// FIXME when failed due to SchedulerClient error, relocate SchedulerClient and retry
374
	readCloser, attribute, err := pt.Start(ctx)
375
	return readCloser, attribute, err
376
}
377

378
func (ptm *peerTaskManager) StartSeedTask(ctx context.Context, req *SeedTaskRequest) (response *SeedTaskResponse, reuse bool, err error) {
379
	response, ok := ptm.tryReuseSeedPeerTask(ctx, req)
380
	if ok {
381
		metrics.PeerTaskCacheHitCount.Add(1)
382
		return response, true, nil
383
	}
384

385
	var limit = rate.Inf
386
	if ptm.PerPeerRateLimit > 0 {
387
		limit = ptm.PerPeerRateLimit
388
	}
389
	if req.Limit > 0 {
390
		limit = rate.Limit(req.Limit)
391
	}
392

393
	response, err = ptm.newSeedTask(ctx, req, limit)
394
	if err != nil {
395
		return nil, false, err
396
	}
397

398
	return response, false, nil
399
}
400

401
type SubscribeResponse struct {
402
	Storage          storage.TaskStorageDriver
403
	PieceInfoChannel chan *PieceInfo
404
	Success          chan struct{}
405
	Fail             chan struct{}
406
	FailReason       func() error
407
}
408

409
func (ptm *peerTaskManager) getRunningTaskKey(taskID, peerID string) string {
410
	if ptm.SplitRunningTasks {
411
		return taskID + "/" + peerID
412
	}
413
	return taskID
414
}
415

416
func (ptm *peerTaskManager) Subscribe(request *commonv1.PieceTaskRequest) (*SubscribeResponse, bool) {
417
	ptc, ok := ptm.findPeerTaskConductor(ptm.getRunningTaskKey(request.TaskId, request.DstPid))
418
	if !ok {
419
		return nil, false
420
	}
421

422
	result := &SubscribeResponse{
423
		Storage:          ptc.storage,
424
		PieceInfoChannel: ptc.broker.Subscribe(),
425
		Success:          ptc.successCh,
426
		Fail:             ptc.failCh,
427
		FailReason:       ptc.getFailedError,
428
	}
429
	return result, true
430
}
431

432
func (ptm *peerTaskManager) Stop(ctx context.Context) error {
433
	// TODO
434
	if ptm.trafficShaper != nil {
435
		ptm.trafficShaper.Stop()
436
	}
437
	return nil
438
}
439

440
func (ptm *peerTaskManager) PeerTaskDone(taskID, peerID string) {
441
	key := ptm.getRunningTaskKey(taskID, peerID)
442
	logger.Debugf("delete done task %s in running tasks", key)
443
	ptm.runningPeerTasks.Delete(key)
444
	if ptm.trafficShaper != nil {
445
		ptm.trafficShaper.RemoveTask(key)
446
	}
447
}
448

449
func (ptm *peerTaskManager) IsPeerTaskRunning(taskID, peerID string) (Task, bool) {
450
	ptc, ok := ptm.runningPeerTasks.Load(ptm.getRunningTaskKey(taskID, peerID))
451
	if ok {
452
		return ptc.(*peerTaskConductor), ok
453
	}
454
	return nil, ok
455
}
456

457
func (ptm *peerTaskManager) StatTask(ctx context.Context, taskID string) (*schedulerv1.Task, error) {
458
	req := &schedulerv1.StatTaskRequest{
459
		TaskId: taskID,
460
	}
461

462
	return ptm.SchedulerClient.StatTask(ctx, req)
463
}
464

465
func (ptm *peerTaskManager) GetPieceManager() PieceManager {
466
	return ptm.PieceManager
467
}
468

469
func (ptm *peerTaskManager) AnnouncePeerTask(ctx context.Context, meta storage.PeerTaskMetadata, url string, taskType commonv1.TaskType, urlMeta *commonv1.UrlMeta) error {
470
	// Check if the given task is completed in local StorageManager.
471
	if ptm.StorageManager.FindCompletedTask(meta.TaskID) == nil {
472
		return errors.New("task not found in local storage")
473
	}
474

475
	// Prepare AnnounceTaskRequest.
476
	totalPieces, err := ptm.StorageManager.GetTotalPieces(ctx, &meta)
477
	if err != nil {
478
		return err
479
	}
480

481
	piecePacket, err := ptm.StorageManager.GetPieces(ctx, &commonv1.PieceTaskRequest{
482
		TaskId:   meta.TaskID,
483
		DstPid:   meta.PeerID,
484
		StartNum: 0,
485
		Limit:    uint32(totalPieces),
486
	})
487
	if err != nil {
488
		return err
489
	}
490
	piecePacket.DstAddr = fmt.Sprintf("%s:%d", ptm.PeerHost.Ip, ptm.PeerHost.DownPort)
491

492
	// Announce peer task to scheduler
493
	if err := ptm.SchedulerClient.AnnounceTask(ctx, &schedulerv1.AnnounceTaskRequest{
494
		TaskId:      meta.TaskID,
495
		TaskType:    taskType,
496
		Url:         url,
497
		UrlMeta:     urlMeta,
498
		PeerHost:    ptm.PeerHost,
499
		PiecePacket: piecePacket,
500
	}); err != nil {
501
		return err
502
	}
503

504
	return nil
505
}
506

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.