Dragonfly2
415 строк · 13.7 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17package peer18
19import (20"context"21"fmt"22"io"23"os"24"time"25
26"github.com/go-http-utils/headers"27semconv "go.opentelemetry.io/otel/semconv/v1.7.0"28"go.opentelemetry.io/otel/trace"29
30commonv1 "d7y.io/api/v2/pkg/apis/common/v1"31
32"d7y.io/dragonfly/v2/client/config"33"d7y.io/dragonfly/v2/client/daemon/storage"34logger "d7y.io/dragonfly/v2/internal/dflog"35"d7y.io/dragonfly/v2/pkg/idgen"36"d7y.io/dragonfly/v2/pkg/net/http"37)
38
39var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation40
41// reuse task search logic:
42// A. prefetch feature enabled
43// for ranged request, 1, find completed subtask, 2, find partial completed parent task
44// for non-ranged request, just find completed task
45// B. prefetch feature disabled
46// for ranged request, 1, find completed normal task, 2, find partial completed parent task
47// for non-ranged request, just find completed task
48
49func (ptm *peerTaskManager) tryReuseFilePeerTask(ctx context.Context,50request *FileTaskRequest) (chan *FileTaskProgress, bool) {51taskID := idgen.TaskIDV1(request.Url, request.UrlMeta)52var (53reuse *storage.ReusePeerTask54reuseRange *http.Range // the range of parent peer task data to read55log *logger.SugaredLoggerOnWith56length int6457err error58)59
60if ptm.enabledPrefetch(request.Range) {61reuse = ptm.StorageManager.FindCompletedSubTask(taskID)62} else {63reuse = ptm.StorageManager.FindCompletedTask(taskID)64}65
66if reuse == nil {67if request.Range == nil {68return nil, false69}70// for ranged request, check the parent task71reuseRange = request.Range72taskID = idgen.ParentTaskIDV1(request.Url, request.UrlMeta)73reuse = ptm.StorageManager.FindPartialCompletedTask(taskID, reuseRange)74if reuse == nil {75return nil, false76}77}78
79logKV := []any{80"peer", request.PeerId,81"task", taskID,82}83
84if spanContext := trace.SpanFromContext(ctx).SpanContext(); spanContext.TraceID().IsValid() {85logKV = append(logKV, "trace", spanContext.TraceID().String())86}87
88if reuseRange == nil {89logKV = append(logKV, "component", "reuseFilePeerTask")90log = logger.With(logKV...)91log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength)92length = reuse.ContentLength93} else {94logKV = append(logKV, "range", request.UrlMeta.Range, "component", "reuseRangeFilePeerTask")95log = logger.With(logKV...)96log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s",97reuse.PeerID, reuse.ContentLength, request.UrlMeta.Range)98
99// correct range like: bytes=1024-100if reuseRange.Start+reuseRange.Length > reuse.ContentLength {101reuseRange.Length = reuse.ContentLength - reuseRange.Start102if reuseRange.Length < 0 {103return nil, false104}105}106length = reuseRange.Length107}108
109_, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient))110span.SetAttributes(config.AttributePeerHost.String(ptm.PeerHost.Id))111span.SetAttributes(semconv.NetHostIPKey.String(ptm.PeerHost.Ip))112span.SetAttributes(config.AttributeTaskID.String(taskID))113span.SetAttributes(config.AttributePeerID.String(request.PeerId))114span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))115span.SetAttributes(semconv.HTTPURLKey.String(request.Url))116if reuseRange != nil {117span.SetAttributes(config.AttributeReuseRange.String(request.UrlMeta.Range))118}119defer span.End()120
121log.Infof("reuse from peer task: %s, total size: %d, target size: %d", reuse.PeerID, reuse.ContentLength, length)122span.AddEvent("reuse peer task", trace.WithAttributes(config.AttributePeerID.String(reuse.PeerID)))123
124start := time.Now()125if reuseRange == nil || request.KeepOriginalOffset {126storeRequest := &storage.StoreRequest{127CommonTaskRequest: storage.CommonTaskRequest{128PeerID: reuse.PeerID,129TaskID: taskID,130Destination: request.Output,131},132MetadataOnly: false,133StoreDataOnly: true,134TotalPieces: reuse.TotalPieces,135OriginalOffset: request.KeepOriginalOffset,136}137err = ptm.StorageManager.Store(ctx, storeRequest)138} else {139err = ptm.storePartialFile(ctx, request, log, reuse, reuseRange)140}141
142if err != nil {143log.Errorf("store error when reuse peer task: %s", err)144span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))145span.RecordError(err)146return nil, false147}148
149// check reuse target is valid150stat, err := os.Stat(request.Output)151if err != nil {152log.Errorf("stat error when reuse peer task: %s", err)153span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))154span.RecordError(err)155return nil, false156}157
158if request.KeepOriginalOffset {159// KeepOriginalOffset case160if length > 0 && stat.Size() == 0 {161err = fmt.Errorf("reuse failed, output file size is zero, but target length %d is not zero", length)162log.Errorf(err.Error())163span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))164span.RecordError(err)165return nil, false166}167} else if length != stat.Size() {168// normal case169err = fmt.Errorf("reuse failed, output file size %d is not same with target length %d", stat.Size(), length)170log.Errorf(err.Error())171span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))172span.RecordError(err)173return nil, false174}175
176var cost = time.Since(start).Milliseconds()177log.Infof("reuse file peer task done, cost: %dms", cost)178
179pg := &FileTaskProgress{180State: &ProgressState{181Success: true,182Code: commonv1.Code_Success,183Msg: "Success",184},185TaskID: taskID,186PeerID: request.PeerId,187ContentLength: length,188CompletedLength: length,189PeerTaskDone: true,190DoneCallback: func() {},191}192
193// make a new buffered channel, because we did not need to call newFileTask194progressCh := make(chan *FileTaskProgress, 1)195progressCh <- pg196
197span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))198span.SetAttributes(config.AttributePeerTaskCost.Int64(cost))199return progressCh, true200}
201
202func (ptm *peerTaskManager) storePartialFile(ctx context.Context, request *FileTaskRequest,203log *logger.SugaredLoggerOnWith, reuse *storage.ReusePeerTask, rg *http.Range) error {204f, err := os.OpenFile(request.Output, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)205if err != nil {206log.Errorf("open dest file error when reuse peer task: %s", err)207return err208}209rc, err := ptm.StorageManager.ReadAllPieces(ctx,210&storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: rg})211if err != nil {212log.Errorf("read pieces error when reuse peer task: %s", err)213return err214}215defer rc.Close()216n, err := io.Copy(f, rc)217if err != nil {218log.Errorf("copy data error when reuse peer task: %s", err)219return err220}221if n != rg.Length {222log.Errorf("copy data length not match when reuse peer task, actual: %d, desire: %d", n, rg.Length)223return io.ErrShortBuffer224}225return nil226}
227
228func (ptm *peerTaskManager) tryReuseStreamPeerTask(ctx context.Context, taskID string,229request *StreamTaskRequest) (io.ReadCloser, map[string]string, bool) {230var (231reuse *storage.ReusePeerTask232reuseRange *http.Range // the range of parent peer task data to read233log *logger.SugaredLoggerOnWith234length int64235)236
237if ptm.enabledPrefetch(request.Range) {238reuse = ptm.StorageManager.FindCompletedSubTask(taskID)239} else {240reuse = ptm.StorageManager.FindCompletedTask(taskID)241}242
243if reuse == nil {244if request.Range == nil {245return nil, nil, false246}247// for ranged request, check the parent task248reuseRange = request.Range249taskID = idgen.ParentTaskIDV1(request.URL, request.URLMeta)250reuse = ptm.StorageManager.FindPartialCompletedTask(taskID, reuseRange)251if reuse == nil {252return nil, nil, false253}254}255
256logKV := []any{257"peer", request.PeerID,258"task", taskID,259}260if spanContext := trace.SpanFromContext(ctx).SpanContext(); spanContext.TraceID().IsValid() {261logKV = append(logKV, "trace", spanContext.TraceID().String())262}263
264if reuseRange == nil {265logKV = append(logKV, "component", "reuseStreamPeerTask")266log = logger.With(logKV...)267log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength)268length = reuse.ContentLength269} else {270logKV = append(logKV, "component", "reuseRangeStreamPeerTask")271log = logger.With(logKV...)272log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s",273reuse.PeerID, reuse.ContentLength, request.URLMeta.Range)274
275// correct range like: bytes=1024-276if reuseRange.Length > reuse.ContentLength-reuseRange.Start {277reuseRange.Length = reuse.ContentLength - reuseRange.Start278if reuseRange.Length < 0 {279return nil, nil, false280}281}282length = reuseRange.Length283}284
285ctx, span := tracer.Start(ctx, config.SpanStreamTask, trace.WithSpanKind(trace.SpanKindClient))286span.SetAttributes(config.AttributePeerHost.String(ptm.PeerHost.Id))287span.SetAttributes(semconv.NetHostIPKey.String(ptm.PeerHost.Ip))288span.SetAttributes(config.AttributeTaskID.String(taskID))289span.SetAttributes(config.AttributePeerID.String(request.PeerID))290span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))291span.SetAttributes(semconv.HTTPURLKey.String(request.URL))292if reuseRange != nil {293span.SetAttributes(config.AttributeReuseRange.String(request.URLMeta.Range))294}295defer span.End()296
297rc, err := ptm.StorageManager.ReadAllPieces(ctx,298&storage.ReadAllPiecesRequest{PeerTaskMetadata: reuse.PeerTaskMetadata, Range: reuseRange})299if err != nil {300log.Errorf("read pieces error when reuse peer task: %s", err)301span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))302span.RecordError(err)303return nil, nil, false304}305
306exa, err := ptm.StorageManager.GetExtendAttribute(ctx, &reuse.PeerTaskMetadata)307if err != nil {308log.Errorf("get extend attribute error when reuse peer task: %s", err)309span.SetAttributes(config.AttributePeerTaskSuccess.Bool(false))310span.RecordError(err)311return nil, nil, false312}313
314attr := map[string]string{}315attr[config.HeaderDragonflyTask] = taskID316attr[config.HeaderDragonflyPeer] = request.PeerID317attr[headers.ContentLength] = fmt.Sprintf("%d", length)318
319if exa != nil {320for k, v := range exa.Header {321attr[k] = v322}323}324
325if reuseRange != nil {326attr[config.HeaderDragonflyRange] = request.URLMeta.Range327attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/%d", reuseRange.Start,328reuseRange.Start+reuseRange.Length-1, reuse.ContentLength)329} else if request.Range != nil {330// the length is from reuse task, ensure it equal with request331if length != request.Range.Length {332log.Errorf("target task length %d did not match range length %d", length, request.Range.Length)333return nil, nil, false334}335attr[headers.ContentRange] = fmt.Sprintf("bytes %d-%d/*", request.Range.Start,336request.Range.Start+request.Range.Length-1)337}338
339// TODO record time when file closed, need add a type to implement Close and WriteTo340span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))341return rc, attr, true342}
343
344func (ptm *peerTaskManager) tryReuseSeedPeerTask(ctx context.Context,345request *SeedTaskRequest) (*SeedTaskResponse, bool) {346taskID := idgen.TaskIDV1(request.Url, request.UrlMeta)347var (348reuse *storage.ReusePeerTask349reuseRange *http.Range // the range of parent peer task data to read350log *logger.SugaredLoggerOnWith351)352
353if ptm.enabledPrefetch(request.Range) {354reuse = ptm.StorageManager.FindCompletedSubTask(taskID)355} else {356reuse = ptm.StorageManager.FindCompletedTask(taskID)357}358
359if reuse == nil {360return nil, false361
362// if request.Range == nil {363// return nil, false364// }365// TODO, mock SeedTaskResponse for sub task366// for ranged request, check the parent task367//reuseRange = request.Range368//taskID = idgen.ParentTaskID(request.Url, request.UrlMeta)369//reuse = ptm.StorageManager.FindPartialCompletedTask(taskID, reuseRange)370//if reuse == nil {371// return nil, false372//}373}374
375if reuseRange == nil {376log = logger.With("peer", request.PeerId, "task", taskID, "component", "reuseSeedPeerTask")377log.Infof("reuse from peer task: %s, total size: %d", reuse.PeerID, reuse.ContentLength)378} else {379log = logger.With("peer", request.PeerId, "task", taskID, "range", request.UrlMeta.Range,380"component", "reuseRangeSeedPeerTask")381log.Infof("reuse partial data from peer task: %s, total size: %d, range: %s",382reuse.PeerID, reuse.ContentLength, request.UrlMeta.Range)383}384
385ctx, span := tracer.Start(ctx, config.SpanReusePeerTask, trace.WithSpanKind(trace.SpanKindClient))386span.SetAttributes(config.AttributePeerHost.String(ptm.PeerHost.Id))387span.SetAttributes(semconv.NetHostIPKey.String(ptm.PeerHost.Ip))388span.SetAttributes(config.AttributeTaskID.String(taskID))389span.SetAttributes(config.AttributePeerID.String(request.PeerId))390span.SetAttributes(config.AttributeReusePeerID.String(reuse.PeerID))391span.SetAttributes(semconv.HTTPURLKey.String(request.Url))392if reuseRange != nil {393span.SetAttributes(config.AttributeReuseRange.String(request.UrlMeta.Range))394}395
396successCh := make(chan struct{}, 1)397successCh <- struct{}{}398
399span.SetAttributes(config.AttributePeerTaskSuccess.Bool(true))400return &SeedTaskResponse{401Context: ctx,402Span: span,403TaskID: taskID,404PeerID: reuse.PeerID,405SubscribeResponse: SubscribeResponse{406Storage: reuse.Storage,407PieceInfoChannel: nil,408Success: successCh,409Fail: nil,410FailReason: func() error {411return nil412},413},414}, true415}
416