Dragonfly2
270 строк · 7.3 Кб
1/*
2* Copyright 2020 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17//go:generate mockgen -destination mocks/upload_manager_mock.go -source upload_manager.go -package mocks
18
19package upload
20
21import (
22"context"
23"crypto/tls"
24"fmt"
25"io"
26"math"
27"net"
28"net/http"
29"strings"
30"time"
31
32ginzap "github.com/gin-contrib/zap"
33"github.com/gin-gonic/gin"
34"github.com/go-http-utils/headers"
35"github.com/johanbrandhorst/certify"
36ginprometheus "github.com/mcuadros/go-gin-prometheus"
37"github.com/soheilhy/cmux"
38"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"
39"golang.org/x/time/rate"
40
41"d7y.io/dragonfly/v2/client/config"
42"d7y.io/dragonfly/v2/client/daemon/storage"
43logger "d7y.io/dragonfly/v2/internal/dflog"
44nethttp "d7y.io/dragonfly/v2/pkg/net/http"
45)
46
47const (
48PrometheusSubsystemName = "dragonfly_dfdaemon_upload"
49OtelServiceName = "dragonfly-dfdaemon-upload"
50)
51
52const (
53RouterGroupDownload = "/download"
54)
55
56var GinLogFileName = "gin-upload.log"
57
58// Manager is the interface used for upload task.
59type Manager interface {
60// Started upload manager server.
61Serve(lis net.Listener) error
62
63// Stop upload manager server.
64Stop() error
65}
66
67// uploadManager provides upload manager function.
68type uploadManager struct {
69*http.Server
70*rate.Limiter
71storageManager storage.Manager
72certify *certify.Certify
73}
74
75// Option is a functional option for configuring the upload manager.
76type Option func(um *uploadManager)
77
78// WithLimiter sets upload rate limiter, the burst size must be bigger than piece size.
79func WithLimiter(limiter *rate.Limiter) func(*uploadManager) {
80return func(manager *uploadManager) {
81manager.Limiter = limiter
82}
83}
84
85func WithCertify(ct *certify.Certify) func(manager *uploadManager) {
86return func(manager *uploadManager) {
87manager.certify = ct
88}
89}
90
91// New returns a new Manager instence.
92func NewUploadManager(cfg *config.DaemonOption, storageManager storage.Manager, logDir string, opts ...Option) (Manager, error) {
93um := &uploadManager{
94storageManager: storageManager,
95}
96
97router := um.initRouter(cfg, logDir)
98um.Server = &http.Server{
99Handler: router,
100}
101
102for _, opt := range opts {
103opt(um)
104}
105
106return um, nil
107}
108
109// Started upload manager server.
110func (um *uploadManager) Serve(listener net.Listener) error {
111if um.certify == nil {
112return um.Server.Serve(listener)
113}
114
115logger.Debugf("use http and https uploader in same listener")
116m := cmux.New(listener)
117httpListener := m.Match(cmux.HTTP1Fast())
118tlsListener := m.Match(cmux.Any())
119
120go func() {
121if err := um.Server.Serve(httpListener); err != nil {
122logger.Debugf("upload server exit: %s", err)
123}
124}()
125
126go func() {
127tlsConfig := &tls.Config{
128GetCertificate: func(hello *tls.ClientHelloInfo) (*tls.Certificate, error) {
129// FIXME peers need pure ip cert, certify checks the ServerName, so workaround here
130hello.ServerName = "peer"
131return um.certify.GetCertificate(hello)
132},
133}
134
135tlsListener = tls.NewListener(tlsListener, tlsConfig)
136if err := um.Server.Serve(tlsListener); err != nil {
137logger.Debugf("upload server exit: %s", err)
138}
139}()
140return m.Serve()
141}
142
143// Stop upload manager server.
144func (um *uploadManager) Stop() error {
145return um.Server.Shutdown(context.Background())
146}
147
148// Initialize router of gin.
149func (um *uploadManager) initRouter(cfg *config.DaemonOption, logDir string) *gin.Engine {
150// Set mode
151if !cfg.Verbose {
152gin.SetMode(gin.ReleaseMode)
153}
154
155r := gin.New()
156
157// Middleware
158r.Use(gin.Recovery())
159r.Use(ginzap.Ginzap(logger.GinLogger.Desugar(), time.RFC3339, true))
160r.Use(ginzap.RecoveryWithZap(logger.GinLogger.Desugar(), true))
161
162// Prometheus metrics
163p := ginprometheus.NewPrometheus(PrometheusSubsystemName)
164// Prometheus metrics need to reduce label,
165// refer to https://prometheus.io/docs/practices/instrumentation/#do-not-overuse-labels.
166p.ReqCntURLLabelMappingFn = func(c *gin.Context) string {
167if strings.HasPrefix(c.Request.URL.Path, RouterGroupDownload) {
168return RouterGroupDownload
169}
170
171return c.Request.URL.Path
172}
173p.Use(r)
174
175// Opentelemetry
176if cfg.Options.Telemetry.Jaeger != "" {
177r.Use(otelgin.Middleware(OtelServiceName))
178}
179
180// Health Check.
181r.GET("/healthy", um.getHealth)
182
183// Peer download task.
184d := r.Group(RouterGroupDownload)
185d.GET(":task_prefix/:task_id", um.getDownload)
186
187return r
188}
189
190// getHealth uses to check server health.
191func (um *uploadManager) getHealth(ctx *gin.Context) {
192ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
193}
194
195// getDownload uses to upload a task file when other peers download from it.
196func (um *uploadManager) getDownload(ctx *gin.Context) {
197var params DownloadParams
198if err := ctx.ShouldBindUri(¶ms); err != nil {
199ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
200return
201}
202
203var query DownalodQuery
204if err := ctx.ShouldBindQuery(&query); err != nil {
205ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
206return
207}
208
209taskID := params.TaskID
210peerID := query.PeerID
211
212log := logger.WithTaskAndPeerID(taskID, peerID).With("component", "uploadManager")
213log.Debugf("upload piece for task %s/%s to %s, request header: %#v", taskID, peerID, ctx.Request.RemoteAddr, ctx.Request.Header)
214rg, err := nethttp.ParseRange(ctx.GetHeader(headers.Range), math.MaxInt64)
215if err != nil {
216log.Errorf("parse range with error: %s", err)
217ctx.JSON(http.StatusBadRequest, gin.H{"errors": err.Error()})
218return
219}
220
221if len(rg) != 1 {
222log.Error("multi range parsed, not support")
223ctx.JSON(http.StatusBadRequest, gin.H{"errors": "invalid range"})
224return
225}
226
227reader, closer, err := um.storageManager.ReadPiece(ctx,
228&storage.ReadPieceRequest{
229PeerTaskMetadata: storage.PeerTaskMetadata{
230TaskID: taskID,
231PeerID: peerID,
232},
233PieceMetadata: storage.PieceMetadata{
234Num: -1,
235Range: rg[0],
236},
237})
238if err != nil {
239log.Errorf("get task data failed: %s", err)
240ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
241return
242}
243defer closer.Close()
244
245// Add header "Content-Length" to avoid chunked body in http client.
246ctx.Header(headers.ContentLength, fmt.Sprintf("%d", rg[0].Length))
247
248// write header immediately, prevent client disconnecting after limiter.Wait() due to response header timeout
249ctx.Writer.WriteHeaderNow()
250ctx.Writer.Flush()
251
252if um.Limiter != nil {
253if err = um.Limiter.WaitN(ctx, int(rg[0].Length)); err != nil {
254log.Errorf("get limit failed: %s", err)
255ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
256return
257}
258}
259
260// If w is a socket, golang will use sendfile or splice syscall for zero copy feature
261// when start to transfer data, we could not call http.Error with header.
262if n, err := io.Copy(ctx.Writer, reader); err != nil {
263log.Errorf("transfer data failed: %s", err)
264return
265} else if n != rg[0].Length {
266log.Errorf("transferred data length not match request, request: %d, transferred: %d",
267rg[0].Length, n)
268return
269}
270}
271