Dragonfly2
809 строк · 20.9 Кб
1/*
2* Copyright 2022 The Dragonfly Authors
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17//go:generate mockgen -destination mocks/dfstore_mock.go -source dfstore.go -package mocks
18
19package dfstore
20
21import (
22"bytes"
23"context"
24"encoding/json"
25"errors"
26"fmt"
27"io"
28"mime/multipart"
29"net/http"
30"net/url"
31"path/filepath"
32"strconv"
33"strings"
34"time"
35
36"github.com/go-http-utils/headers"
37
38"d7y.io/dragonfly/v2/client/config"
39"d7y.io/dragonfly/v2/client/daemon/objectstorage"
40pkgobjectstorage "d7y.io/dragonfly/v2/pkg/objectstorage"
41)
42
43const (
44// DefaultGetObjectMetadatasLimit is the default limit of get object metadatas.
45DefaultGetObjectMetadatasLimit = 1000
46
47// MaxGetObjectMetadatasLimit is the max limit of get object metadatas.
48MaxGetObjectMetadatasLimit = 1000
49// DefaultPutObjectBufferSize is the buffer size of io.CopyBuffer
50DefaultPutObjectBufferSize = 64 * 1024 * 1024
51)
52
53// Dfstore is the interface used for object storage.
54type Dfstore interface {
55// CreateBucketRequestWithContext returns *http.Request of create bucket.
56CreateBucketRequestWithContext(ctx context.Context, input *CreateBucketInput) (*http.Request, error)
57
58// CreateBucket creates bucket.
59CreateBucketWithContext(ctx context.Context, input *CreateBucketInput) error
60
61// GetObjectMetadataRequestWithContext returns *http.Request of getting object metadata.
62GetObjectMetadataRequestWithContext(ctx context.Context, input *GetObjectMetadataInput) (*http.Request, error)
63
64// GetObjectMetadataWithContext returns matedata of object.
65GetObjectMetadataWithContext(ctx context.Context, input *GetObjectMetadataInput) (*pkgobjectstorage.ObjectMetadata, error)
66
67// GetObjectRequestWithContext returns *http.Request of getting object.
68GetObjectRequestWithContext(ctx context.Context, input *GetObjectInput) (*http.Request, error)
69
70// GetObjectWithContext returns data of object.
71GetObjectWithContext(ctx context.Context, input *GetObjectInput) (io.ReadCloser, error)
72
73// GetObjectMetadatasRequestWithContext returns *http.Request of getting object metadatas.
74GetObjectMetadatasRequestWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*http.Request, error)
75
76// GetObjectMetadatasWithContext returns list of object metadatas.
77GetObjectMetadatasWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*pkgobjectstorage.ObjectMetadatas, error)
78
79// PutObjectRequestWithContext returns *http.Request of putting object.
80PutObjectRequestWithContext(ctx context.Context, input *PutObjectInput) (*http.Request, error)
81
82// PutObjectWithContext puts data of object.
83PutObjectWithContext(ctx context.Context, input *PutObjectInput) error
84
85// CopyObjectRequestWithContext returns *http.Request of copying object.
86CopyObjectRequestWithContext(ctx context.Context, input *CopyObjectInput) (*http.Request, error)
87
88// CopyObjectWithContext copy object from source to destination.
89CopyObjectWithContext(ctx context.Context, input *CopyObjectInput) error
90
91// DeleteObjectRequestWithContext returns *http.Request of deleting object.
92DeleteObjectRequestWithContext(ctx context.Context, input *DeleteObjectInput) (*http.Request, error)
93
94// DeleteObjectWithContext deletes data of object.
95DeleteObjectWithContext(ctx context.Context, input *DeleteObjectInput) error
96
97// IsObjectExistRequestWithContext returns *http.Request of heading object.
98IsObjectExistRequestWithContext(ctx context.Context, input *IsObjectExistInput) (*http.Request, error)
99
100// IsObjectExistWithContext returns whether the object exists.
101IsObjectExistWithContext(ctx context.Context, input *IsObjectExistInput) (bool, error)
102}
103
104// dfstore provides object storage function.
105type dfstore struct {
106endpoint string
107httpClient *http.Client
108}
109
110// Option is a functional option for configuring the dfstore.
111type Option func(dfs *dfstore)
112
113// WithHTTPClient set http client for dfstore.
114func WithHTTPClient(client *http.Client) Option {
115return func(dfs *dfstore) {
116dfs.httpClient = client
117}
118}
119
120// New dfstore instance.
121func New(endpoint string, options ...Option) Dfstore {
122dfs := &dfstore{
123endpoint: endpoint,
124httpClient: http.DefaultClient,
125}
126
127for _, opt := range options {
128opt(dfs)
129}
130
131return dfs
132}
133
134// GetObjectMetadataInput is used to construct request of getting object metadata.
135type GetObjectMetadataInput struct {
136// BucketName is bucket name.
137BucketName string
138
139// ObjectKey is object key.
140ObjectKey string
141}
142
143// Validate validates GetObjectMetadataInput fields.
144func (i *GetObjectMetadataInput) Validate() error {
145if i.BucketName == "" {
146return errors.New("invalid BucketName")
147}
148
149if i.ObjectKey == "" {
150return errors.New("invalid ObjectKey")
151}
152
153return nil
154}
155
156// GetObjectMetadataRequestWithContext returns *http.Request of getting object metadata.
157func (dfs *dfstore) GetObjectMetadataRequestWithContext(ctx context.Context, input *GetObjectMetadataInput) (*http.Request, error) {
158if err := input.Validate(); err != nil {
159return nil, err
160}
161
162u, err := url.Parse(dfs.endpoint)
163if err != nil {
164return nil, err
165}
166
167u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
168
169if strings.HasSuffix(input.ObjectKey, "/") {
170u.Path += "/"
171}
172
173req, err := http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
174if err != nil {
175return nil, err
176}
177
178return req, nil
179}
180
181// GetObjectMetadataWithContext returns metadata of object.
182func (dfs *dfstore) GetObjectMetadataWithContext(ctx context.Context, input *GetObjectMetadataInput) (*pkgobjectstorage.ObjectMetadata, error) {
183req, err := dfs.GetObjectMetadataRequestWithContext(ctx, input)
184if err != nil {
185return nil, err
186}
187
188resp, err := dfs.httpClient.Do(req)
189if err != nil {
190return nil, err
191}
192defer resp.Body.Close()
193
194if resp.StatusCode/100 != 2 {
195return nil, fmt.Errorf("bad response status %s", resp.Status)
196}
197
198contentLength, err := strconv.ParseInt(resp.Header.Get(headers.ContentLength), 10, 64)
199if err != nil {
200return nil, err
201}
202
203lastModifiedTime, err := time.Parse(http.TimeFormat, resp.Header.Get(config.HeaderDragonflyObjectMetaLastModifiedTime))
204if err != nil {
205return nil, err
206}
207
208return &pkgobjectstorage.ObjectMetadata{
209ContentDisposition: resp.Header.Get(headers.ContentDisposition),
210ContentEncoding: resp.Header.Get(headers.ContentEncoding),
211ContentLanguage: resp.Header.Get(headers.ContentLanguage),
212ContentLength: int64(contentLength),
213ContentType: resp.Header.Get(headers.ContentType),
214ETag: resp.Header.Get(headers.ContentType),
215Digest: resp.Header.Get(config.HeaderDragonflyObjectMetaDigest),
216LastModifiedTime: lastModifiedTime,
217StorageClass: resp.Header.Get(config.HeaderDragonflyObjectMetaStorageClass),
218}, nil
219}
220
221// GetObjectMetadatasInput is used to construct request of getting object metadatas.
222type GetObjectMetadatasInput struct {
223// BucketName is the bucket name.
224BucketName string
225
226// Prefix filters the objects by their key's prefix.
227Prefix string
228
229// Marker is used for pagination, indicating the object key to start listing from.
230Marker string
231
232// Delimiter is used to create a hierarchical structure, simulating directories in the listing results.
233Delimiter string
234
235// Limit specifies the maximum number of objects to be returned in a single listing request.
236Limit int64
237}
238
239// Convert converts GetObjectMetadatasInput fields.
240func (i *GetObjectMetadatasInput) Convert() {
241if i.Limit == 0 {
242i.Limit = DefaultGetObjectMetadatasLimit
243}
244
245if i.Limit > MaxGetObjectMetadatasLimit {
246i.Limit = DefaultGetObjectMetadatasLimit
247}
248}
249
250// Validate validates GetObjectMetadatasInput fields.
251func (i *GetObjectMetadatasInput) Validate() error {
252if i.BucketName == "" {
253return errors.New("invalid BucketName")
254}
255
256if i.Limit <= 0 {
257return errors.New("invalid limit")
258}
259
260return nil
261}
262
263// GetObjectMetadatasRequestWithContext returns *http.Request of getting object metadatas.
264func (dfs *dfstore) GetObjectMetadatasRequestWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*http.Request, error) {
265// Convert input fields.
266input.Convert()
267
268if err := input.Validate(); err != nil {
269return nil, err
270}
271
272u, err := url.Parse(dfs.endpoint)
273if err != nil {
274return nil, err
275}
276
277u.Path = filepath.Join("buckets", input.BucketName, "metadatas")
278
279query := u.Query()
280if input.Prefix != "" {
281query.Set("prefix", input.Prefix)
282}
283
284if input.Marker != "" {
285query.Set("marker", input.Marker)
286}
287
288if input.Delimiter != "" {
289query.Set("delimiter", input.Delimiter)
290}
291
292if input.Limit != 0 {
293query.Set("limit", fmt.Sprint(input.Limit))
294}
295
296u.RawQuery = query.Encode()
297
298req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
299if err != nil {
300return nil, err
301}
302
303return req, nil
304}
305
306// GetObjectMetadatasWithContext returns *http.Request of getting object metadatas.
307func (dfs *dfstore) GetObjectMetadatasWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*pkgobjectstorage.ObjectMetadatas, error) {
308req, err := dfs.GetObjectMetadatasRequestWithContext(ctx, input)
309if err != nil {
310return nil, err
311}
312
313resp, err := dfs.httpClient.Do(req)
314if err != nil {
315return nil, err
316}
317defer resp.Body.Close()
318
319if resp.StatusCode/100 != 2 {
320return nil, fmt.Errorf("bad response status %s", resp.Status)
321}
322
323var metadatas pkgobjectstorage.ObjectMetadatas
324if err := json.NewDecoder(resp.Body).Decode(&metadatas); err != nil {
325return nil, err
326}
327
328return &metadatas, nil
329}
330
331// GetObjectInput is used to construct request of getting object.
332type GetObjectInput struct {
333// BucketName is bucket name.
334BucketName string
335
336// ObjectKey is object key.
337ObjectKey string
338
339// Filter is used to generate a unique Task ID by
340// filtering unnecessary query params in the URL,
341// it is separated by & character.
342Filter string
343
344// Range is the HTTP range header.
345Range string
346}
347
348// Validate validates GetObjectInput fields.
349func (i *GetObjectInput) Validate() error {
350if i.BucketName == "" {
351return errors.New("invalid BucketName")
352}
353
354if i.ObjectKey == "" {
355return errors.New("invalid ObjectKey")
356}
357
358return nil
359}
360
361// GetObjectRequestWithContext returns *http.Request of getting object.
362func (dfs *dfstore) GetObjectRequestWithContext(ctx context.Context, input *GetObjectInput) (*http.Request, error) {
363if err := input.Validate(); err != nil {
364return nil, err
365}
366
367u, err := url.Parse(dfs.endpoint)
368if err != nil {
369return nil, err
370}
371
372u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
373
374if strings.HasSuffix(input.ObjectKey, "/") {
375u.Path += "/"
376}
377
378query := u.Query()
379if input.Filter != "" {
380query.Set("filter", input.Filter)
381}
382u.RawQuery = query.Encode()
383
384req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
385if err != nil {
386return nil, err
387}
388
389if input.Range != "" {
390req.Header.Set(headers.Range, input.Range)
391}
392
393return req, nil
394}
395
396// GetObjectWithContext returns data of object.
397func (dfs *dfstore) GetObjectWithContext(ctx context.Context, input *GetObjectInput) (io.ReadCloser, error) {
398req, err := dfs.GetObjectRequestWithContext(ctx, input)
399if err != nil {
400return nil, err
401}
402
403resp, err := dfs.httpClient.Do(req)
404if err != nil {
405return nil, err
406}
407
408if resp.StatusCode/100 != 2 {
409return nil, fmt.Errorf("bad response status %s", resp.Status)
410}
411
412return resp.Body, nil
413}
414
415// PutObjectInput is used to construct request of putting object.
416type PutObjectInput struct {
417// BucketName is bucket name.
418BucketName string
419
420// ObjectKey is object key.
421ObjectKey string
422
423// Filter is used to generate a unique Task ID by
424// filtering unnecessary query params in the URL,
425// it is separated by & character.
426Filter string
427
428// Mode is the mode in which the backend is written,
429// including WriteBack and AsyncWriteBack.
430Mode int
431
432// MaxReplicas is the maximum number of
433// replicas of an object cache in seed peers.
434MaxReplicas int
435
436// Reader is reader of object.
437Reader io.Reader
438}
439
440// Validate validates PutObjectInput fields.
441func (i *PutObjectInput) Validate() error {
442if i.BucketName == "" {
443return errors.New("invalid BucketName")
444}
445
446if i.ObjectKey == "" {
447return errors.New("invalid ObjectKey")
448}
449
450if i.Mode != objectstorage.WriteBack && i.Mode != objectstorage.AsyncWriteBack {
451return errors.New("invalid Mode")
452}
453
454if i.MaxReplicas < 0 || i.MaxReplicas > 100 {
455return errors.New("invalid MaxReplicas")
456}
457
458return nil
459}
460
461// PutObjectRequestWithContext returns *http.Request of putting object.
462func (dfs *dfstore) PutObjectRequestWithContext(ctx context.Context, input *PutObjectInput) (*http.Request, error) {
463if err := input.Validate(); err != nil {
464return nil, err
465}
466
467body := &bytes.Buffer{}
468writer := multipart.NewWriter(body)
469
470// AsyncWriteBack mode is used by default.
471if err := writer.WriteField("mode", fmt.Sprint(input.Mode)); err != nil {
472return nil, err
473}
474
475if input.Filter != "" {
476if err := writer.WriteField("filter", input.Filter); err != nil {
477return nil, err
478}
479}
480
481if input.MaxReplicas > 0 {
482if err := writer.WriteField("maxReplicas", fmt.Sprint(input.MaxReplicas)); err != nil {
483return nil, err
484}
485}
486
487part, err := writer.CreateFormFile("file", filepath.Base(input.ObjectKey))
488if err != nil {
489return nil, err
490}
491
492buf := make([]byte, DefaultPutObjectBufferSize)
493if _, err := io.CopyBuffer(part, input.Reader, buf); err != nil {
494return nil, err
495}
496
497if err := writer.Close(); err != nil {
498return nil, err
499}
500
501u, err := url.Parse(dfs.endpoint)
502if err != nil {
503return nil, err
504}
505
506u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
507
508if strings.HasSuffix(input.ObjectKey, "/") {
509u.Path += "/"
510}
511
512req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), body)
513if err != nil {
514return nil, err
515}
516req.Header.Add(headers.ContentType, writer.FormDataContentType())
517
518return req, nil
519}
520
521// PutObjectWithContext puts data of object.
522func (dfs *dfstore) PutObjectWithContext(ctx context.Context, input *PutObjectInput) error {
523req, err := dfs.PutObjectRequestWithContext(ctx, input)
524if err != nil {
525return err
526}
527
528resp, err := http.DefaultClient.Do(req)
529if err != nil {
530return err
531}
532defer resp.Body.Close()
533
534if resp.StatusCode/100 != 2 {
535return fmt.Errorf("bad response status %s", resp.Status)
536}
537
538return nil
539}
540
541// CopyObjectInput is used to construct request of copying object.
542type CopyObjectInput struct {
543// BucketName is bucket name.
544BucketName string
545
546// SourceObjectKey is the key of object to be copied.
547SourceObjectKey string
548
549// DestinationObjectKey is the object key of the destination.
550DestinationObjectKey string
551}
552
553// Validate validates CopyObjectInput fields.
554func (i *CopyObjectInput) Validate() error {
555if i.BucketName == "" {
556return errors.New("invalid BucketName")
557}
558
559if i.SourceObjectKey == "" {
560return errors.New("invalid SourceObjectKey")
561}
562
563if i.DestinationObjectKey == "" {
564return errors.New("invalid DestinationObjectKey")
565}
566
567return nil
568}
569
570// CopyObjectWithContext copy object from source to destination.
571func (dfs *dfstore) CopyObjectWithContext(ctx context.Context, input *CopyObjectInput) error {
572req, err := dfs.CopyObjectRequestWithContext(ctx, input)
573if err != nil {
574return err
575}
576
577resp, err := http.DefaultClient.Do(req)
578if err != nil {
579return err
580}
581defer resp.Body.Close()
582
583if resp.StatusCode/100 != 2 {
584return fmt.Errorf("bad response status %s", resp.Status)
585}
586
587return nil
588}
589
590// CopyObjectRequestWithContext returns *http.Request of copying object.
591func (dfs *dfstore) CopyObjectRequestWithContext(ctx context.Context, input *CopyObjectInput) (*http.Request, error) {
592if err := input.Validate(); err != nil {
593return nil, err
594}
595
596body := &bytes.Buffer{}
597writer := multipart.NewWriter(body)
598
599if err := writer.WriteField("source_object_key", input.SourceObjectKey); err != nil {
600return nil, err
601}
602
603if err := writer.Close(); err != nil {
604return nil, err
605}
606
607u, err := url.Parse(dfs.endpoint)
608if err != nil {
609return nil, err
610}
611
612u.Path = filepath.Join("buckets", input.BucketName, "objects", input.DestinationObjectKey)
613
614query := u.Query()
615
616u.RawQuery = query.Encode()
617
618req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), body)
619if err != nil {
620return nil, err
621}
622
623req.Header.Add(headers.ContentType, writer.FormDataContentType())
624req.Header.Add(config.HeaderDragonflyObjectOperation, fmt.Sprint(objectstorage.CopyOperation))
625return req, nil
626}
627
628// CreateBucketInput is used to construct request of creating bucket.
629type CreateBucketInput struct {
630// BucketName is bucket name.
631BucketName string
632}
633
634// Validate validates CreateBucketInput fields.
635func (i *CreateBucketInput) Validate() error {
636if i.BucketName == "" {
637return errors.New("invalid BucketName")
638}
639
640return nil
641}
642
643// CreateBucketWithContext creates bucket.
644func (dfs *dfstore) CreateBucketWithContext(ctx context.Context, input *CreateBucketInput) error {
645req, err := dfs.CreateBucketRequestWithContext(ctx, input)
646if err != nil {
647return err
648}
649
650resp, err := http.DefaultClient.Do(req)
651if err != nil {
652return err
653}
654defer resp.Body.Close()
655
656if resp.StatusCode/100 != 2 {
657return fmt.Errorf("bad response status %s", resp.Status)
658}
659
660return nil
661}
662
663// CreateBucketRequestWithContext returns *http.Request of creating bucket.
664func (dfs *dfstore) CreateBucketRequestWithContext(ctx context.Context, input *CreateBucketInput) (*http.Request, error) {
665if err := input.Validate(); err != nil {
666return nil, err
667}
668
669u, err := url.Parse(dfs.endpoint)
670if err != nil {
671return nil, err
672}
673
674u.Path = filepath.Join("buckets", input.BucketName)
675
676query := u.Query()
677
678u.RawQuery = query.Encode()
679
680return http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil)
681}
682
683// DeleteObjectInput is used to construct request of deleting object.
684type DeleteObjectInput struct {
685// BucketName is bucket name.
686BucketName string
687
688// ObjectKey is object key.
689ObjectKey string
690}
691
692// Validate validates DeleteObjectInput fields.
693func (i *DeleteObjectInput) Validate() error {
694if i.BucketName == "" {
695return errors.New("invalid BucketName")
696}
697
698if i.ObjectKey == "" {
699return errors.New("invalid ObjectKey")
700}
701
702return nil
703}
704
705// DeleteObjectRequestWithContext returns *http.Request of deleting object.
706func (dfs *dfstore) DeleteObjectRequestWithContext(ctx context.Context, input *DeleteObjectInput) (*http.Request, error) {
707if err := input.Validate(); err != nil {
708return nil, err
709}
710
711u, err := url.Parse(dfs.endpoint)
712if err != nil {
713return nil, err
714}
715
716u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
717
718if strings.HasSuffix(input.ObjectKey, "/") {
719u.Path += "/"
720}
721
722return http.NewRequestWithContext(ctx, http.MethodDelete, u.String(), nil)
723}
724
725// DeleteObjectWithContext deletes data of object.
726func (dfs *dfstore) DeleteObjectWithContext(ctx context.Context, input *DeleteObjectInput) error {
727req, err := dfs.DeleteObjectRequestWithContext(ctx, input)
728if err != nil {
729return err
730}
731
732resp, err := http.DefaultClient.Do(req)
733if err != nil {
734return err
735}
736defer resp.Body.Close()
737
738if resp.StatusCode/100 != 2 {
739return fmt.Errorf("bad response status %s", resp.Status)
740}
741
742return nil
743}
744
745// IsObjectExistInput is used to construct request of heading object.
746type IsObjectExistInput struct {
747// BucketName is bucket name.
748BucketName string
749
750// ObjectKey is object key.
751ObjectKey string
752}
753
754// Validate validates IsObjectExistInput fields.
755func (i *IsObjectExistInput) Validate() error {
756if i.BucketName == "" {
757return errors.New("invalid BucketName")
758}
759
760if i.ObjectKey == "" {
761return errors.New("invalid ObjectKey")
762}
763
764return nil
765}
766
767// IsObjectExistRequestWithContext returns *http.Request of heading object.
768func (dfs *dfstore) IsObjectExistRequestWithContext(ctx context.Context, input *IsObjectExistInput) (*http.Request, error) {
769if err := input.Validate(); err != nil {
770return nil, err
771}
772
773u, err := url.Parse(dfs.endpoint)
774if err != nil {
775return nil, err
776}
777
778u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
779
780if strings.HasSuffix(input.ObjectKey, "/") {
781u.Path += "/"
782}
783
784return http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
785}
786
787// IsObjectExistWithContext returns whether the object exists.
788func (dfs *dfstore) IsObjectExistWithContext(ctx context.Context, input *IsObjectExistInput) (bool, error) {
789req, err := dfs.IsObjectExistRequestWithContext(ctx, input)
790if err != nil {
791return false, err
792}
793
794resp, err := http.DefaultClient.Do(req)
795if err != nil {
796return false, err
797}
798defer resp.Body.Close()
799
800if resp.StatusCode == http.StatusNotFound {
801return false, nil
802}
803
804if resp.StatusCode/100 != 2 {
805return false, fmt.Errorf("bad response status %s", resp.Status)
806}
807
808return true, nil
809}
810