Dragonfly2

Форк
0
/
upload_manager.go 
270 строк · 7.3 Кб
1
/*
2
 *     Copyright 2020 The Dragonfly Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *      http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
//go:generate mockgen -destination mocks/upload_manager_mock.go -source upload_manager.go -package mocks
18

19
package upload
20

21
import (
22
	"context"
23
	"crypto/tls"
24
	"fmt"
25
	"io"
26
	"math"
27
	"net"
28
	"net/http"
29
	"strings"
30
	"time"
31

32
	ginzap "github.com/gin-contrib/zap"
33
	"github.com/gin-gonic/gin"
34
	"github.com/go-http-utils/headers"
35
	"github.com/johanbrandhorst/certify"
36
	ginprometheus "github.com/mcuadros/go-gin-prometheus"
37
	"github.com/soheilhy/cmux"
38
	"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
39
	"golang.org/x/time/rate"
40

41
	"d7y.io/dragonfly/v2/client/config"
42
	"d7y.io/dragonfly/v2/client/daemon/storage"
43
	logger "d7y.io/dragonfly/v2/internal/dflog"
44
	nethttp "d7y.io/dragonfly/v2/pkg/net/http"
45
)
46

47
const (
48
	PrometheusSubsystemName = "dragonfly_dfdaemon_upload"
49
	OtelServiceName         = "dragonfly-dfdaemon-upload"
50
)
51

52
const (
53
	RouterGroupDownload = "/download"
54
)
55

56
var GinLogFileName = "gin-upload.log"
57

58
// Manager is the interface used for upload task.
59
type Manager interface {
60
	// Started upload manager server.
61
	Serve(lis net.Listener) error
62

63
	// Stop upload manager server.
64
	Stop() error
65
}
66

67
// uploadManager provides upload manager function.
68
type uploadManager struct {
69
	*http.Server
70
	*rate.Limiter
71
	storageManager storage.Manager
72
	certify        *certify.Certify
73
}
74

75
// Option is a functional option for configuring the upload manager.
76
type Option func(um *uploadManager)
77

78
// WithLimiter sets upload rate limiter, the burst size must be bigger than piece size.
79
func WithLimiter(limiter *rate.Limiter) func(*uploadManager) {
80
	return func(manager *uploadManager) {
81
		manager.Limiter = limiter
82
	}
83
}
84

85
func WithCertify(ct *certify.Certify) func(manager *uploadManager) {
86
	return func(manager *uploadManager) {
87
		manager.certify = ct
88
	}
89
}
90

91
// New returns a new Manager instence.
92
func NewUploadManager(cfg *config.DaemonOption, storageManager storage.Manager, logDir string, opts ...Option) (Manager, error) {
93
	um := &uploadManager{
94
		storageManager: storageManager,
95
	}
96

97
	router := um.initRouter(cfg, logDir)
98
	um.Server = &http.Server{
99
		Handler: router,
100
	}
101

102
	for _, opt := range opts {
103
		opt(um)
104
	}
105

106
	return um, nil
107
}
108

109
// Started upload manager server.
110
func (um *uploadManager) Serve(listener net.Listener) error {
111
	if um.certify == nil {
112
		return um.Server.Serve(listener)
113
	}
114

115
	logger.Debugf("use http and https uploader in same listener")
116
	m := cmux.New(listener)
117
	httpListener := m.Match(cmux.HTTP1Fast())
118
	tlsListener := m.Match(cmux.Any())
119

120
	go func() {
121
		if err := um.Server.Serve(httpListener); err != nil {
122
			logger.Debugf("upload server exit: %s", err)
123
		}
124
	}()
125

126
	go func() {
127
		tlsConfig := &tls.Config{
128
			GetCertificate: func(hello *tls.ClientHelloInfo) (*tls.Certificate, error) {
129
				// FIXME peers need pure ip cert, certify checks the ServerName, so workaround here
130
				hello.ServerName = "peer"
131
				return um.certify.GetCertificate(hello)
132
			},
133
		}
134

135
		tlsListener = tls.NewListener(tlsListener, tlsConfig)
136
		if err := um.Server.Serve(tlsListener); err != nil {
137
			logger.Debugf("upload server exit: %s", err)
138
		}
139
	}()
140
	return m.Serve()
141
}
142

143
// Stop upload manager server.
144
func (um *uploadManager) Stop() error {
145
	return um.Server.Shutdown(context.Background())
146
}
147

148
// Initialize router of gin.
149
func (um *uploadManager) initRouter(cfg *config.DaemonOption, logDir string) *gin.Engine {
150
	// Set mode
151
	if !cfg.Verbose {
152
		gin.SetMode(gin.ReleaseMode)
153
	}
154

155
	r := gin.New()
156

157
	// Middleware
158
	r.Use(gin.Recovery())
159
	r.Use(ginzap.Ginzap(logger.GinLogger.Desugar(), time.RFC3339, true))
160
	r.Use(ginzap.RecoveryWithZap(logger.GinLogger.Desugar(), true))
161

162
	// Prometheus metrics
163
	p := ginprometheus.NewPrometheus(PrometheusSubsystemName)
164
	// Prometheus metrics need to reduce label,
165
	// refer to https://prometheus.io/docs/practices/instrumentation/#do-not-overuse-labels.
166
	p.ReqCntURLLabelMappingFn = func(c *gin.Context) string {
167
		if strings.HasPrefix(c.Request.URL.Path, RouterGroupDownload) {
168
			return RouterGroupDownload
169
		}
170

171
		return c.Request.URL.Path
172
	}
173
	p.Use(r)
174

175
	// Opentelemetry
176
	if cfg.Options.Telemetry.Jaeger != "" {
177
		r.Use(otelgin.Middleware(OtelServiceName))
178
	}
179

180
	// Health Check.
181
	r.GET("/healthy", um.getHealth)
182

183
	// Peer download task.
184
	d := r.Group(RouterGroupDownload)
185
	d.GET(":task_prefix/:task_id", um.getDownload)
186

187
	return r
188
}
189

190
// getHealth uses to check server health.
191
func (um *uploadManager) getHealth(ctx *gin.Context) {
192
	ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
193
}
194

195
// getDownload uses to upload a task file when other peers download from it.
196
func (um *uploadManager) getDownload(ctx *gin.Context) {
197
	var params DownloadParams
198
	if err := ctx.ShouldBindUri(&params); err != nil {
199
		ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
200
		return
201
	}
202

203
	var query DownalodQuery
204
	if err := ctx.ShouldBindQuery(&query); err != nil {
205
		ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
206
		return
207
	}
208

209
	taskID := params.TaskID
210
	peerID := query.PeerID
211

212
	log := logger.WithTaskAndPeerID(taskID, peerID).With("component", "uploadManager")
213
	log.Debugf("upload piece for task %s/%s to %s, request header: %#v", taskID, peerID, ctx.Request.RemoteAddr, ctx.Request.Header)
214
	rg, err := nethttp.ParseRange(ctx.GetHeader(headers.Range), math.MaxInt64)
215
	if err != nil {
216
		log.Errorf("parse range with error: %s", err)
217
		ctx.JSON(http.StatusBadRequest, gin.H{"errors": err.Error()})
218
		return
219
	}
220

221
	if len(rg) != 1 {
222
		log.Error("multi range parsed, not support")
223
		ctx.JSON(http.StatusBadRequest, gin.H{"errors": "invalid range"})
224
		return
225
	}
226

227
	reader, closer, err := um.storageManager.ReadPiece(ctx,
228
		&storage.ReadPieceRequest{
229
			PeerTaskMetadata: storage.PeerTaskMetadata{
230
				TaskID: taskID,
231
				PeerID: peerID,
232
			},
233
			PieceMetadata: storage.PieceMetadata{
234
				Num:   -1,
235
				Range: rg[0],
236
			},
237
		})
238
	if err != nil {
239
		log.Errorf("get task data failed: %s", err)
240
		ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
241
		return
242
	}
243
	defer closer.Close()
244

245
	// Add header "Content-Length" to avoid chunked body in http client.
246
	ctx.Header(headers.ContentLength, fmt.Sprintf("%d", rg[0].Length))
247

248
	// write header immediately, prevent client disconnecting after limiter.Wait() due to response header timeout
249
	ctx.Writer.WriteHeaderNow()
250
	ctx.Writer.Flush()
251

252
	if um.Limiter != nil {
253
		if err = um.Limiter.WaitN(ctx, int(rg[0].Length)); err != nil {
254
			log.Errorf("get limit failed: %s", err)
255
			ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
256
			return
257
		}
258
	}
259

260
	// If w is a socket, golang will use sendfile or splice syscall for zero copy feature
261
	// when start to transfer data, we could not call http.Error with header.
262
	if n, err := io.Copy(ctx.Writer, reader); err != nil {
263
		log.Errorf("transfer data failed: %s", err)
264
		return
265
	} else if n != rg[0].Length {
266
		log.Errorf("transferred data length not match request, request: %d, transferred: %d",
267
			rg[0].Length, n)
268
		return
269
	}
270
}
271

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.