Dragonfly2

Форк
0
/
piece_downloader.go 
226 строк · 5.9 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
//go:generate mockgen -destination piece_downloader_mock.go -source piece_downloader.go -package peer
18

19
package peer
20

21
import (
22
	"context"
23
	"crypto/tls"
24
	"crypto/x509"
25
	"fmt"
26
	"io"
27
	"net"
28
	"net/http"
29
	"net/url"
30
	"time"
31

32
	"go.opentelemetry.io/otel"
33
	"go.opentelemetry.io/otel/propagation"
34
	"google.golang.org/grpc/status"
35

36
	commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
37

38
	"d7y.io/dragonfly/v2/client/daemon/storage"
39
	logger "d7y.io/dragonfly/v2/internal/dflog"
40
	"d7y.io/dragonfly/v2/pkg/digest"
41
	"d7y.io/dragonfly/v2/pkg/source"
42
)
43

44
type DownloadPieceRequest struct {
45
	piece      *commonv1.PieceInfo
46
	log        *logger.SugaredLoggerOnWith
47
	storage    storage.TaskStorageDriver
48
	TaskID     string
49
	PeerID     string
50
	DstPid     string
51
	DstAddr    string
52
	CalcDigest bool
53
}
54

55
type DownloadPieceResult struct {
56
	// Size of piece
57
	Size int64
58
	// BeginTime nanosecond
59
	BeginTime int64
60
	// FinishTime nanosecond
61
	FinishTime int64
62
	DstPeerID  string
63
	Fail       bool
64
	pieceInfo  *commonv1.PieceInfo
65
}
66

67
type PieceDownloader interface {
68
	DownloadPiece(context.Context, *DownloadPieceRequest) (io.Reader, io.Closer, error)
69
}
70

71
type PieceDownloaderOption func(*pieceDownloader) error
72

73
type pieceDownloader struct {
74
	scheme     string
75
	httpClient *http.Client
76
}
77

78
type pieceDownloadError struct {
79
	connectionError bool
80
	status          string
81
	statusCode      int
82
	target          string
83
	err             error
84
}
85

86
type backSourceError struct {
87
	err error
88
	st  *status.Status
89
}
90

91
func isConnectionError(err error) bool {
92
	if e, ok := err.(*pieceDownloadError); ok {
93
		return e.connectionError
94
	}
95
	return false
96
}
97

98
func isPieceNotFound(err error) bool {
99
	if e, ok := err.(*pieceDownloadError); ok {
100
		return e.statusCode == http.StatusNotFound
101
	}
102
	return false
103
}
104

105
func isBackSourceError(err error) bool {
106
	if _, ok := err.(*backSourceError); ok {
107
		return true
108
	}
109
	if _, ok := err.(*source.UnexpectedStatusCodeError); ok {
110
		return true
111
	}
112
	return false
113
}
114

115
func (e *pieceDownloadError) Error() string {
116
	if e.connectionError {
117
		return fmt.Sprintf("connect with %s with error: %s", e.target, e.err)
118
	}
119
	return fmt.Sprintf("download %s with error status: %s", e.target, e.status)
120
}
121

122
func (e *backSourceError) Error() string {
123
	if e.st != nil {
124
		return e.st.Err().Error()
125
	}
126
	return e.err.Error()
127
}
128

129
var _ PieceDownloader = (*pieceDownloader)(nil)
130

131
var defaultTransport http.RoundTripper = &http.Transport{
132
	Proxy: http.ProxyFromEnvironment,
133
	DialContext: (&net.Dialer{
134
		Timeout:   2 * time.Second,
135
		KeepAlive: 30 * time.Second,
136
		DualStack: true,
137
	}).DialContext,
138
	MaxIdleConns:          100,
139
	IdleConnTimeout:       90 * time.Second,
140
	ResponseHeaderTimeout: 2 * time.Second,
141
	TLSHandshakeTimeout:   10 * time.Second,
142
	ExpectContinueTimeout: 2 * time.Second,
143
}
144

145
func NewPieceDownloader(timeout time.Duration, caCertPool *x509.CertPool) PieceDownloader {
146
	pd := &pieceDownloader{
147
		scheme: "http",
148
		httpClient: &http.Client{
149
			Transport: defaultTransport,
150
			Timeout:   timeout,
151
		},
152
	}
153

154
	if caCertPool != nil {
155
		pd.scheme = "https"
156
		defaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{
157
			ClientCAs: caCertPool,
158
			RootCAs:   caCertPool,
159
		}
160
	}
161

162
	return pd
163
}
164

165
func (p *pieceDownloader) DownloadPiece(ctx context.Context, req *DownloadPieceRequest) (io.Reader, io.Closer, error) {
166
	httpRequest, err := p.buildDownloadPieceHTTPRequest(ctx, req)
167
	if err != nil {
168
		return nil, nil, err
169
	}
170
	resp, err := p.httpClient.Do(httpRequest)
171
	if err != nil {
172
		logger.Errorf("task id: %s, piece num: %d, dst: %s, download piece failed: %s",
173
			req.TaskID, req.piece.PieceNum, req.DstAddr, err)
174
		return nil, nil, &pieceDownloadError{
175
			target:          httpRequest.URL.String(),
176
			err:             err,
177
			connectionError: true,
178
		}
179
	}
180
	if resp.StatusCode > 299 {
181
		_, _ = io.Copy(io.Discard, resp.Body)
182
		_ = resp.Body.Close()
183
		return nil, nil, &pieceDownloadError{
184
			target:          httpRequest.URL.String(),
185
			err:             err,
186
			connectionError: false,
187
			status:          resp.Status,
188
			statusCode:      resp.StatusCode,
189
		}
190
	}
191
	reader, closer := resp.Body.(io.Reader), resp.Body.(io.Closer)
192
	if req.CalcDigest {
193
		req.log.Debugf("calculate digest for piece %d, digest: %s", req.piece.PieceNum, req.piece.PieceMd5)
194
		reader, err = digest.NewReader(digest.AlgorithmMD5, io.LimitReader(resp.Body, int64(req.piece.RangeSize)), digest.WithEncoded(req.piece.PieceMd5), digest.WithLogger(req.log))
195
		if err != nil {
196
			_ = closer.Close()
197
			req.log.Errorf("init digest reader error: %s", err.Error())
198
			return nil, nil, err
199
		}
200
	}
201
	return reader, closer, nil
202
}
203

204
func (p *pieceDownloader) buildDownloadPieceHTTPRequest(ctx context.Context, d *DownloadPieceRequest) (*http.Request, error) {
205
	if len(d.TaskID) <= 3 {
206
		return nil, fmt.Errorf("invalid task id")
207
	}
208
	// FIXME switch to https when tls enabled
209
	targetURL := url.URL{
210
		Scheme:   p.scheme,
211
		Host:     d.DstAddr,
212
		Path:     fmt.Sprintf("download/%s/%s", d.TaskID[:3], d.TaskID),
213
		RawQuery: fmt.Sprintf("peerId=%s", d.DstPid),
214
	}
215

216
	logger.Debugf("built request url: %s", targetURL.String())
217
	req, _ := http.NewRequestWithContext(ctx, http.MethodGet, targetURL.String(), nil)
218

219
	// TODO use string.Builder
220
	req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d",
221
		d.piece.RangeStart, d.piece.RangeStart+uint64(d.piece.RangeSize)-1))
222

223
	// inject trace id into request header
224
	otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(req.Header))
225
	return req, nil
226
}
227

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.