2
// __ _____ __ ___ ___ __ _| |_ ___
3
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
4
// \ V V / __/ (_| |\ V /| | (_| | || __/
5
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
7
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
9
// CONTACT: hello@weaviate.io
23
"github.com/minio/minio-go/v7"
24
"github.com/minio/minio-go/v7/pkg/credentials"
25
"github.com/pkg/errors"
26
"github.com/sirupsen/logrus"
27
"github.com/weaviate/weaviate/entities/backup"
28
"github.com/weaviate/weaviate/usecases/monitoring"
34
logger logrus.FieldLogger
38
func newClient(config *clientConfig, logger logrus.FieldLogger, dataPath string) (*s3Client, error) {
39
region := os.Getenv("AWS_REGION")
41
region = os.Getenv("AWS_DEFAULT_REGION")
44
var creds *credentials.Credentials
45
if (os.Getenv("AWS_ACCESS_KEY_ID") != "" || os.Getenv("AWS_ACCESS_KEY") != "") &&
46
(os.Getenv("AWS_SECRET_ACCESS_KEY") != "" || os.Getenv("AWS_SECRET_KEY") != "") {
47
creds = credentials.NewEnvAWS()
49
creds = credentials.NewIAM("")
50
if _, err := creds.Get(); err != nil {
51
// can be anonymous access
52
creds = credentials.NewEnvAWS()
56
client, err := minio.New(config.Endpoint, &minio.Options{
59
Secure: config.UseSSL,
62
return nil, errors.Wrap(err, "create client")
64
return &s3Client{client, config, logger, dataPath}, nil
67
func (s *s3Client) makeObjectName(parts ...string) string {
68
base := path.Join(parts...)
69
return path.Join(s.config.BackupPath, base)
72
func (s *s3Client) HomeDir(backupID string) string {
73
return "s3://" + path.Join(s.config.Bucket,
74
s.makeObjectName(backupID))
77
func (s *s3Client) GetObject(ctx context.Context, backupID, key string) ([]byte, error) {
78
objectName := s.makeObjectName(backupID, key)
80
if err := ctx.Err(); err != nil {
81
return nil, backup.NewErrContextExpired(errors.Wrapf(err, "get object '%s'", objectName))
84
obj, err := s.client.GetObject(ctx, s.config.Bucket, objectName, minio.GetObjectOptions{})
86
return nil, backup.NewErrInternal(errors.Wrapf(err, "get object '%s'", objectName))
89
contents, err := io.ReadAll(obj)
91
if s3Err, ok := err.(minio.ErrorResponse); ok && s3Err.StatusCode == http.StatusNotFound {
92
return nil, backup.NewErrNotFound(errors.Wrapf(err, "get object '%s'", objectName))
94
return nil, backup.NewErrInternal(errors.Wrapf(err, "get object '%s'", objectName))
97
metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.GetMetricWithLabelValues(Name, "class")
99
metric.Add(float64(len(contents)))
105
func (s *s3Client) PutFile(ctx context.Context, backupID, key string, srcPath string) error {
106
objectName := s.makeObjectName(backupID, key)
107
srcPath = path.Join(s.dataPath, srcPath)
108
opt := minio.PutObjectOptions{ContentType: "application/octet-stream"}
110
_, err := s.client.FPutObject(ctx, s.config.Bucket, objectName, srcPath, opt)
112
return backup.NewErrInternal(
113
errors.Wrapf(err, "put file '%s'", objectName))
117
file, err := os.Stat(srcPath)
123
metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.GetMetricWithLabelValues(Name, "class")
125
metric.Add(float64(size))
130
func (s *s3Client) PutObject(ctx context.Context, backupID, key string, byes []byte) error {
131
objectName := s.makeObjectName(backupID, key)
132
opt := minio.PutObjectOptions{ContentType: "application/octet-stream"}
133
reader := bytes.NewReader(byes)
134
objectSize := int64(len(byes))
136
_, err := s.client.PutObject(ctx, s.config.Bucket, objectName, reader, objectSize, opt)
138
return backup.NewErrInternal(
139
errors.Wrapf(err, "put object '%s'", objectName))
142
metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.GetMetricWithLabelValues(Name, "class")
144
metric.Add(float64(len(byes)))
149
func (s *s3Client) Initialize(ctx context.Context, backupID string) error {
150
key := "access-check"
152
if err := s.PutObject(ctx, backupID, key, []byte("")); err != nil {
153
return errors.Wrap(err, "failed to access-check s3 backup module")
156
objectName := s.makeObjectName(backupID, key)
157
opt := minio.RemoveObjectOptions{}
158
if err := s.client.RemoveObject(ctx, s.config.Bucket, objectName, opt); err != nil {
159
return errors.Wrap(err, "failed to remove access-check s3 backup module")
165
// WriteFile downloads contents of an object to a local file destPath
166
func (s *s3Client) WriteToFile(ctx context.Context, backupID, key, destPath string) error {
167
object := s.makeObjectName(backupID, key)
168
err := s.client.FGetObject(ctx, s.config.Bucket, object, destPath, minio.GetObjectOptions{})
170
return fmt.Errorf("s3.FGetObject %q %q: %w", destPath, object, err)
173
if st, err := os.Stat(destPath); err == nil {
174
metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.GetMetricWithLabelValues(Name, "class")
176
metric.Add(float64(st.Size()))
182
func (s *s3Client) Write(ctx context.Context, backupID, key string, r io.ReadCloser) (int64, error) {
184
path := s.makeObjectName(backupID, key)
185
opt := minio.PutObjectOptions{
186
ContentType: "application/octet-stream",
187
DisableMultipart: false,
190
info, err := s.client.PutObject(ctx, s.config.Bucket, path, r, -1, opt)
192
return info.Size, fmt.Errorf("write object %q", path)
195
if metric, err := monitoring.GetMetrics().BackupStoreDataTransferred.
196
GetMetricWithLabelValues(Name, "class"); err == nil {
197
metric.Add(float64(float64(info.Size)))
199
return info.Size, nil
202
func (s *s3Client) Read(ctx context.Context, backupID, key string, w io.WriteCloser) (int64, error) {
204
path := s.makeObjectName(backupID, key)
205
obj, err := s.client.GetObject(ctx, s.config.Bucket, path, minio.GetObjectOptions{})
207
return 0, fmt.Errorf("get object %q: %w", path, err)
210
read, err := io.Copy(w, obj)
212
err = fmt.Errorf("get object %q: %w", path, err)
213
if s3Err, ok := err.(minio.ErrorResponse); ok && s3Err.StatusCode == http.StatusNotFound {
214
err = backup.NewErrNotFound(err)
219
if metric, err := monitoring.GetMetrics().BackupRestoreDataTransferred.
220
GetMetricWithLabelValues(Name, "class"); err == nil {
221
metric.Add(float64(float64(read)))
227
func (s *s3Client) SourceDataPath() string {