weaviate

Форк
0
229 строк · 6.7 Кб
1
//                           _       _
2
// __      _____  __ ___   ___  __ _| |_ ___
3
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
4
//  \ V  V /  __/ (_| |\ V /| | (_| | ||  __/
5
//   \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
6
//
7
//  Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
8
//
9
//  CONTACT: hello@weaviate.io
10
//
11

12
package modstgs3
13

14
import (
15
	"bytes"
16
	"context"
17
	"fmt"
18
	"io"
19
	"net/http"
20
	"os"
21
	"path"
22

23
	"github.com/minio/minio-go/v7"
24
	"github.com/minio/minio-go/v7/pkg/credentials"
25
	"github.com/pkg/errors"
26
	"github.com/sirupsen/logrus"
27
	"github.com/weaviate/weaviate/entities/backup"
28
	"github.com/weaviate/weaviate/usecases/monitoring"
29
)
30

31
type s3Client struct {
32
	client   *minio.Client
33
	config   *clientConfig
34
	logger   logrus.FieldLogger
35
	dataPath string
36
}
37

38
func newClient(config *clientConfig, logger logrus.FieldLogger, dataPath string) (*s3Client, error) {
39
	region := os.Getenv("AWS_REGION")
40
	if len(region) == 0 {
41
		region = os.Getenv("AWS_DEFAULT_REGION")
42
	}
43

44
	var creds *credentials.Credentials
45
	if (os.Getenv("AWS_ACCESS_KEY_ID") != "" || os.Getenv("AWS_ACCESS_KEY") != "") &&
46
		(os.Getenv("AWS_SECRET_ACCESS_KEY") != "" || os.Getenv("AWS_SECRET_KEY") != "") {
47
		creds = credentials.NewEnvAWS()
48
	} else {
49
		creds = credentials.NewIAM("")
50
		if _, err := creds.Get(); err != nil {
51
			// can be anonymous access
52
			creds = credentials.NewEnvAWS()
53
		}
54
	}
55

56
	client, err := minio.New(config.Endpoint, &minio.Options{
57
		Creds:  creds,
58
		Region: region,
59
		Secure: config.UseSSL,
60
	})
61
	if err != nil {
62
		return nil, errors.Wrap(err, "create client")
63
	}
64
	return &s3Client{client, config, logger, dataPath}, nil
65
}
66

67
func (s *s3Client) makeObjectName(parts ...string) string {
68
	base := path.Join(parts...)
69
	return path.Join(s.config.BackupPath, base)
70
}
71

72
func (s *s3Client) HomeDir(backupID string) string {
73
	return "s3://" + path.Join(s.config.Bucket,
74
		s.makeObjectName(backupID))
75
}
76

77
func (s *s3Client) GetObject(ctx context.Context, backupID, key string) ([]byte, error) {
78
	objectName := s.makeObjectName(backupID, key)
79

80
	if err := ctx.Err(); err != nil {
81
		return nil, backup.NewErrContextExpired(errors.Wrapf(err, "get object '%s'", objectName))
82
	}
83

84
	obj, err := s.client.GetObject(ctx, s.config.Bucket, objectName, minio.GetObjectOptions{})
85
	if err != nil {
86
		return nil, backup.NewErrInternal(errors.Wrapf(err, "get object '%s'", objectName))
87
	}
88

89
	contents, err := io.ReadAll(obj)
90
	if err != nil {
91
		if s3Err, ok := err.(minio.ErrorResponse); ok && s3Err.StatusCode == http.StatusNotFound {
92
			return nil, backup.NewErrNotFound(errors.Wrapf(err, "get object '%s'", objectName))
93
		}
94
		return nil, backup.NewErrInternal(errors.Wrapf(err, "get object '%s'", objectName))
95
	}
96

97
	metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.GetMetricWithLabelValues(Name, "class")
98
	if err == nil {
99
		metric.Add(float64(len(contents)))
100
	}
101

102
	return contents, nil
103
}
104

105
func (s *s3Client) PutFile(ctx context.Context, backupID, key string, srcPath string) error {
106
	objectName := s.makeObjectName(backupID, key)
107
	srcPath = path.Join(s.dataPath, srcPath)
108
	opt := minio.PutObjectOptions{ContentType: "application/octet-stream"}
109

110
	_, err := s.client.FPutObject(ctx, s.config.Bucket, objectName, srcPath, opt)
111
	if err != nil {
112
		return backup.NewErrInternal(
113
			errors.Wrapf(err, "put file '%s'", objectName))
114
	}
115

116
	// Get filesize
117
	file, err := os.Stat(srcPath)
118
	if err != nil {
119
		return nil
120
	}
121
	size := file.Size()
122

123
	metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.GetMetricWithLabelValues(Name, "class")
124
	if err == nil {
125
		metric.Add(float64(size))
126
	}
127
	return nil
128
}
129

130
func (s *s3Client) PutObject(ctx context.Context, backupID, key string, byes []byte) error {
131
	objectName := s.makeObjectName(backupID, key)
132
	opt := minio.PutObjectOptions{ContentType: "application/octet-stream"}
133
	reader := bytes.NewReader(byes)
134
	objectSize := int64(len(byes))
135

136
	_, err := s.client.PutObject(ctx, s.config.Bucket, objectName, reader, objectSize, opt)
137
	if err != nil {
138
		return backup.NewErrInternal(
139
			errors.Wrapf(err, "put object '%s'", objectName))
140
	}
141

142
	metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.GetMetricWithLabelValues(Name, "class")
143
	if err == nil {
144
		metric.Add(float64(len(byes)))
145
	}
146
	return nil
147
}
148

149
func (s *s3Client) Initialize(ctx context.Context, backupID string) error {
150
	key := "access-check"
151

152
	if err := s.PutObject(ctx, backupID, key, []byte("")); err != nil {
153
		return errors.Wrap(err, "failed to access-check s3 backup module")
154
	}
155

156
	objectName := s.makeObjectName(backupID, key)
157
	opt := minio.RemoveObjectOptions{}
158
	if err := s.client.RemoveObject(ctx, s.config.Bucket, objectName, opt); err != nil {
159
		return errors.Wrap(err, "failed to remove access-check s3 backup module")
160
	}
161

162
	return nil
163
}
164

165
// WriteFile downloads contents of an object to a local file destPath
166
func (s *s3Client) WriteToFile(ctx context.Context, backupID, key, destPath string) error {
167
	object := s.makeObjectName(backupID, key)
168
	err := s.client.FGetObject(ctx, s.config.Bucket, object, destPath, minio.GetObjectOptions{})
169
	if err != nil {
170
		return fmt.Errorf("s3.FGetObject %q %q: %w", destPath, object, err)
171
	}
172

173
	if st, err := os.Stat(destPath); err == nil {
174
		metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.GetMetricWithLabelValues(Name, "class")
175
		if err == nil {
176
			metric.Add(float64(st.Size()))
177
		}
178
	}
179
	return nil
180
}
181

182
func (s *s3Client) Write(ctx context.Context, backupID, key string, r io.ReadCloser) (int64, error) {
183
	defer r.Close()
184
	path := s.makeObjectName(backupID, key)
185
	opt := minio.PutObjectOptions{
186
		ContentType:      "application/octet-stream",
187
		DisableMultipart: false,
188
	}
189

190
	info, err := s.client.PutObject(ctx, s.config.Bucket, path, r, -1, opt)
191
	if err != nil {
192
		return info.Size, fmt.Errorf("write object %q", path)
193
	}
194

195
	if metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.
196
		GetMetricWithLabelValues(Name, "class"); err == nil {
197
		metric.Add(float64(float64(info.Size)))
198
	}
199
	return info.Size, nil
200
}
201

202
func (s *s3Client) Read(ctx context.Context, backupID, key string, w io.WriteCloser) (int64, error) {
203
	defer w.Close()
204
	path := s.makeObjectName(backupID, key)
205
	obj, err := s.client.GetObject(ctx, s.config.Bucket, path, minio.GetObjectOptions{})
206
	if err != nil {
207
		return 0, fmt.Errorf("get object %q: %w", path, err)
208
	}
209

210
	read, err := io.Copy(w, obj)
211
	if err != nil {
212
		err = fmt.Errorf("get object %q: %w", path, err)
213
		if s3Err, ok := err.(minio.ErrorResponse); ok && s3Err.StatusCode == http.StatusNotFound {
214
			err = backup.NewErrNotFound(err)
215
		}
216
		return 0, err
217
	}
218

219
	if metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.
220
		GetMetricWithLabelValues(Name, "class"); err == nil {
221
		metric.Add(float64(float64(read)))
222
	}
223

224
	return read, nil
225
}
226

227
func (s *s3Client) SourceDataPath() string {
228
	return s.dataPath
229
}
230

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.