Dragonfly2

Форк
0
/
peertask_reuse.go 
415 строк · 13.7 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package peer
18

19
import (
20
	"context"
21
	"fmt"
22
	"io"
23
	"os"
24
	"time"
25

26
	"github.com/go-http-utils/headers"
27
	semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
28
	"go.opentelemetry.io/otel/trace"
29

30
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
31

32
	"d7y.io/dragonfly/v2/client/config"
33
	"d7y.io/dragonfly/v2/client/daemon/storage"
34
	logger "d7y.io/dragonfly/v2/internal/dflog"
35
	"d7y.io/dragonfly/v2/pkg/idgen"
36
	"d7y.io/dragonfly/v2/pkg/net/http"
37
)
38

39
var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
40

41
// reuse task search logic:
42
// A. prefetch feature enabled
43
//    for ranged request, 1, find completed subtask, 2, find partial completed parent task
44
//    for non-ranged request, just find completed task
45
// B. prefetch feature disabled
46
//    for ranged request, 1, find completed normal task, 2, find partial completed parent task
47
//    for non-ranged request, just find completed task
48

49
func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,
50
	request *FileTaskRequest) (chan *FileTaskProgress, bool) {
51
	taskID := idgen.TaskIDV1(request.Url, request.UrlMeta)
52
	var (
53
		reuse      *storage.ReusePeerTask
54
		reuseRange *http.Range // the range of parent peer task data to read
55
		log        *logger.SugaredLoggerOnWith
56
		length     int64
57
		err        error
58
	)
59

60
	if ptm.enabledPrefetch(request.Range) {
61
		reuse = ptm.StorageManager.FindCompletedSubTask(taskID)
62
	} else {
63
		reuse = ptm.StorageManager.FindCompletedTask(taskID)
64
	}
65

66
	if reuse == nil {
67
		if request.Range == nil {
68
			return nil, false
69
		}
70
		// for ranged request, check the parent task
71
		reuseRange = request.Range
72
		taskID = idgen.ParentTaskIDV1(request.Url, request.UrlMeta)
73
		reuse = ptm.StorageManager.FindPartialCompletedTask(taskID, reuseRange)
74
		if reuse == nil {
75
			return nil, false
76
		}
77
	}
78

79
	logKV := []any{
80
		"peer", request.PeerId,
81
		"task", taskID,
82
	}
83

84
	if spanContext := trace.SpanFromContext(ctx).SpanContext(); spanContext.TraceID().IsValid() {
85
		logKV = append(logKV, "trace", spanContext.TraceID().String())
86
	}
87

88
	if reuseRange == nil {
89
		logKV = append(logKV, "component", "reuseFilePeerTask")
90
		log = logger.With(logKV...)
91
		log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength)
92
		length = reuse.ContentLength
93
	} else {
94
		logKV = append(logKV, "range", request.UrlMeta.Range, "component", "reuseRangeFilePeerTask")
95
		log = logger.With(logKV...)
96
		log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s",
97
			reuse.PeerID, reuse.ContentLength, request.UrlMeta.Range)
98

99
		// correct range like: bytes=1024-
100
		if reuseRange.Start+reuseRange.Length > reuse.ContentLength {
101
			reuseRange.Length = reuse.ContentLength - reuseRange.Start
102
			if reuseRange.Length < 0 {
103
				return nil, false
104
			}
105
		}
106
		length = reuseRange.Length
107
	}
108

109
	_, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient))
110
	span.SetAttributes(config.AttributePeerHost.String(ptm.PeerHost.Id))
111
	span.SetAttributes(semconv.NetHostIPKey.String(ptm.PeerHost.Ip))
112
	span.SetAttributes(config.AttributeTaskID.String(taskID))
113
	span.SetAttributes(config.AttributePeerID.String(request.PeerId))
114
	span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))
115
	span.SetAttributes(semconv.HTTPURLKey.String(request.Url))
116
	if reuseRange != nil {
117
		span.SetAttributes(config.AttributeReuseRange.String(request.UrlMeta.Range))
118
	}
119
	defer span.End()
120

121
	log.Infof("reuse from peer task: %s, total size: %d, target size: %d", reuse.PeerID, reuse.ContentLength, length)
122
	span.AddEvent("reuse peer task", trace.WithAttributes(config.AttributePeerID.String(reuse.PeerID)))
123

124
	start := time.Now()
125
	if reuseRange == nil || request.KeepOriginalOffset {
126
		storeRequest := &storage.StoreRequest{
127
			CommonTaskRequest: storage.CommonTaskRequest{
128
				PeerID:      reuse.PeerID,
129
				TaskID:      taskID,
130
				Destination: request.Output,
131
			},
132
			MetadataOnly:   false,
133
			StoreDataOnly:  true,
134
			TotalPieces:    reuse.TotalPieces,
135
			OriginalOffset: request.KeepOriginalOffset,
136
		}
137
		err = ptm.StorageManager.Store(ctx, storeRequest)
138
	} else {
139
		err = ptm.storePartialFile(ctx, request, log, reuse, reuseRange)
140
	}
141

142
	if err != nil {
143
		log.Errorf("store error when reuse peer task: %s", err)
144
		span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
145
		span.RecordError(err)
146
		return nil, false
147
	}
148

149
	// check reuse target is valid
150
	stat, err := os.Stat(request.Output)
151
	if err != nil {
152
		log.Errorf("stat error when reuse peer task: %s", err)
153
		span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
154
		span.RecordError(err)
155
		return nil, false
156
	}
157

158
	if request.KeepOriginalOffset {
159
		// KeepOriginalOffset case
160
		if length > 0 && stat.Size() == 0 {
161
			err = fmt.Errorf("reuse failed, output file size is zero, but target length %d is not zero", length)
162
			log.Errorf(err.Error())
163
			span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
164
			span.RecordError(err)
165
			return nil, false
166
		}
167
	} else if length != stat.Size() {
168
		// normal case
169
		err = fmt.Errorf("reuse failed, output file size %d is not same with target length %d", stat.Size(), length)
170
		log.Errorf(err.Error())
171
		span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
172
		span.RecordError(err)
173
		return nil, false
174
	}
175

176
	var cost = time.Since(start).Milliseconds()
177
	log.Infof("reuse file peer task done, cost: %dms", cost)
178

179
	pg := &FileTaskProgress{
180
		State: &ProgressState{
181
			Success: true,
182
			Code:    commonv1.Code_Success,
183
			Msg:     "Success",
184
		},
185
		TaskID:          taskID,
186
		PeerID:          request.PeerId,
187
		ContentLength:   length,
188
		CompletedLength: length,
189
		PeerTaskDone:    true,
190
		DoneCallback:    func() {},
191
	}
192

193
	// make a new buffered channel, because we did not need to call newFileTask
194
	progressCh := make(chan *FileTaskProgress, 1)
195
	progressCh <- pg
196

197
	span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
198
	span.SetAttributes(config.AttributePeerTaskCost.Int64(cost))
199
	return progressCh, true
200
}
201

202
func (ptm *peerTaskManager) storePartialFile(ctx context.Context, request *FileTaskRequest,
203
	log *logger.SugaredLoggerOnWith, reuse *storage.ReusePeerTask, rg *http.Range) error {
204
	f, err := os.OpenFile(request.Output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
205
	if err != nil {
206
		log.Errorf("open dest file error when reuse peer task: %s", err)
207
		return err
208
	}
209
	rc, err := ptm.StorageManager.ReadAllPieces(ctx,
210
		&storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: rg})
211
	if err != nil {
212
		log.Errorf("read pieces error when reuse peer task: %s", err)
213
		return err
214
	}
215
	defer rc.Close()
216
	n, err := io.Copy(f, rc)
217
	if err != nil {
218
		log.Errorf("copy data error when reuse peer task: %s", err)
219
		return err
220
	}
221
	if n != rg.Length {
222
		log.Errorf("copy data length not match when reuse peer task, actual: %d, desire: %d", n, rg.Length)
223
		return io.ErrShortBuffer
224
	}
225
	return nil
226
}
227

228
func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, taskID string,
229
	request *StreamTaskRequest) (io.ReadCloser, map[string]string, bool) {
230
	var (
231
		reuse      *storage.ReusePeerTask
232
		reuseRange *http.Range // the range of parent peer task data to read
233
		log        *logger.SugaredLoggerOnWith
234
		length     int64
235
	)
236

237
	if ptm.enabledPrefetch(request.Range) {
238
		reuse = ptm.StorageManager.FindCompletedSubTask(taskID)
239
	} else {
240
		reuse = ptm.StorageManager.FindCompletedTask(taskID)
241
	}
242

243
	if reuse == nil {
244
		if request.Range == nil {
245
			return nil, nil, false
246
		}
247
		// for ranged request, check the parent task
248
		reuseRange = request.Range
249
		taskID = idgen.ParentTaskIDV1(request.URL, request.URLMeta)
250
		reuse = ptm.StorageManager.FindPartialCompletedTask(taskID, reuseRange)
251
		if reuse == nil {
252
			return nil, nil, false
253
		}
254
	}
255

256
	logKV := []any{
257
		"peer", request.PeerID,
258
		"task", taskID,
259
	}
260
	if spanContext := trace.SpanFromContext(ctx).SpanContext(); spanContext.TraceID().IsValid() {
261
		logKV = append(logKV, "trace", spanContext.TraceID().String())
262
	}
263

264
	if reuseRange == nil {
265
		logKV = append(logKV, "component", "reuseStreamPeerTask")
266
		log = logger.With(logKV...)
267
		log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength)
268
		length = reuse.ContentLength
269
	} else {
270
		logKV = append(logKV, "component", "reuseRangeStreamPeerTask")
271
		log = logger.With(logKV...)
272
		log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s",
273
			reuse.PeerID, reuse.ContentLength, request.URLMeta.Range)
274

275
		// correct range like: bytes=1024-
276
		if reuseRange.Length > reuse.ContentLength-reuseRange.Start {
277
			reuseRange.Length = reuse.ContentLength - reuseRange.Start
278
			if reuseRange.Length < 0 {
279
				return nil, nil, false
280
			}
281
		}
282
		length = reuseRange.Length
283
	}
284

285
	ctx, span := tracer.Start(ctx, config.SpanStreamTask, trace.WithSpanKind(trace.SpanKindClient))
286
	span.SetAttributes(config.AttributePeerHost.String(ptm.PeerHost.Id))
287
	span.SetAttributes(semconv.NetHostIPKey.String(ptm.PeerHost.Ip))
288
	span.SetAttributes(config.AttributeTaskID.String(taskID))
289
	span.SetAttributes(config.AttributePeerID.String(request.PeerID))
290
	span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))
291
	span.SetAttributes(semconv.HTTPURLKey.String(request.URL))
292
	if reuseRange != nil {
293
		span.SetAttributes(config.AttributeReuseRange.String(request.URLMeta.Range))
294
	}
295
	defer span.End()
296

297
	rc, err := ptm.StorageManager.ReadAllPieces(ctx,
298
		&storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: reuseRange})
299
	if err != nil {
300
		log.Errorf("read pieces error when reuse peer task: %s", err)
301
		span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
302
		span.RecordError(err)
303
		return nil, nil, false
304
	}
305

306
	exa, err := ptm.StorageManager.GetExtendAttribute(ctx, &reuse.PeerTaskMetadata)
307
	if err != nil {
308
		log.Errorf("get extend attribute error when reuse peer task: %s", err)
309
		span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))
310
		span.RecordError(err)
311
		return nil, nil, false
312
	}
313

314
	attr := map[string]string{}
315
	attr[config.HeaderDragonflyTask] = taskID
316
	attr[config.HeaderDragonflyPeer] = request.PeerID
317
	attr[headers.ContentLength] = fmt.Sprintf("%d", length)
318

319
	if exa != nil {
320
		for k, v := range exa.Header {
321
			attr[k] = v
322
		}
323
	}
324

325
	if reuseRange != nil {
326
		attr[config.HeaderDragonflyRange] = request.URLMeta.Range
327
		attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/%d", reuseRange.Start,
328
			reuseRange.Start+reuseRange.Length-1, reuse.ContentLength)
329
	} else if request.Range != nil {
330
		// the length is from reuse task, ensure it equal with request
331
		if length != request.Range.Length {
332
			log.Errorf("target task length %d did not match range length %d", length, request.Range.Length)
333
			return nil, nil, false
334
		}
335
		attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/*", request.Range.Start,
336
			request.Range.Start+request.Range.Length-1)
337
	}
338

339
	// TODO record time when file closed, need add a type to implement Close and WriteTo
340
	span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
341
	return rc, attr, true
342
}
343

344
func (ptm *peerTaskManager) tryReuseSeedPeerTask(ctx context.Context,
345
	request *SeedTaskRequest) (*SeedTaskResponse, bool) {
346
	taskID := idgen.TaskIDV1(request.Url, request.UrlMeta)
347
	var (
348
		reuse      *storage.ReusePeerTask
349
		reuseRange *http.Range // the range of parent peer task data to read
350
		log        *logger.SugaredLoggerOnWith
351
	)
352

353
	if ptm.enabledPrefetch(request.Range) {
354
		reuse = ptm.StorageManager.FindCompletedSubTask(taskID)
355
	} else {
356
		reuse = ptm.StorageManager.FindCompletedTask(taskID)
357
	}
358

359
	if reuse == nil {
360
		return nil, false
361

362
		// if request.Range == nil {
363
		// return nil, false
364
		// }
365
		// TODO, mock SeedTaskResponse for sub task
366
		// for ranged request, check the parent task
367
		//reuseRange = request.Range
368
		//taskID = idgen.ParentTaskID(request.Url, request.UrlMeta)
369
		//reuse = ptm.StorageManager.FindPartialCompletedTask(taskID, reuseRange)
370
		//if reuse == nil {
371
		//	return nil, false
372
		//}
373
	}
374

375
	if reuseRange == nil {
376
		log = logger.With("peer", request.PeerId, "task", taskID, "component", "reuseSeedPeerTask")
377
		log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength)
378
	} else {
379
		log = logger.With("peer", request.PeerId, "task", taskID, "range", request.UrlMeta.Range,
380
			"component", "reuseRangeSeedPeerTask")
381
		log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s",
382
			reuse.PeerID, reuse.ContentLength, request.UrlMeta.Range)
383
	}
384

385
	ctx, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient))
386
	span.SetAttributes(config.AttributePeerHost.String(ptm.PeerHost.Id))
387
	span.SetAttributes(semconv.NetHostIPKey.String(ptm.PeerHost.Ip))
388
	span.SetAttributes(config.AttributeTaskID.String(taskID))
389
	span.SetAttributes(config.AttributePeerID.String(request.PeerId))
390
	span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))
391
	span.SetAttributes(semconv.HTTPURLKey.String(request.Url))
392
	if reuseRange != nil {
393
		span.SetAttributes(config.AttributeReuseRange.String(request.UrlMeta.Range))
394
	}
395

396
	successCh := make(chan struct{}, 1)
397
	successCh <- struct{}{}
398

399
	span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))
400
	return &SeedTaskResponse{
401
		Context: ctx,
402
		Span:    span,
403
		TaskID:  taskID,
404
		PeerID:  reuse.PeerID,
405
		SubscribeResponse: SubscribeResponse{
406
			Storage:          reuse.Storage,
407
			PieceInfoChannel: nil,
408
			Success:          successCh,
409
			Fail:             nil,
410
			FailReason: func() error {
411
				return nil
412
			},
413
		},
414
	}, true
415
}
416

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.