wal-g

Форк
0
/
fetch_helper.go 
234 строки · 6.4 Кб
1
package internal
2

3
import (
4
	"encoding/json"
5
	"fmt"
6
	"io"
7
	"os"
8
	"os/user"
9
	"path/filepath"
10

11
	"github.com/pkg/errors"
12
	"github.com/wal-g/tracelog"
13
	"github.com/wal-g/wal-g/internal/compression"
14
	"github.com/wal-g/wal-g/internal/ioextensions"
15
	"github.com/wal-g/wal-g/pkg/storages/storage"
16
	"github.com/wal-g/wal-g/utility"
17
)
18

19
type ArchiveNonExistenceError struct {
20
	error
21
}
22

23
func newArchiveNonExistenceError(archiveName string) ArchiveNonExistenceError {
24
	return ArchiveNonExistenceError{errors.Errorf("Archive '%s' does not exist.\n", archiveName)}
25
}
26

27
func (err ArchiveNonExistenceError) Error() string {
28
	return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
29
}
30

31
// DownloadFile downloads, decompresses and decrypts
32
func DownloadFile(reader StorageFolderReader, filename, ext string, writeCloser io.WriteCloser) error {
33
	utility.LoggedClose(writeCloser, "")
34

35
	decompressor := compression.FindDecompressor(ext)
36
	if decompressor == nil {
37
		return fmt.Errorf("decompressor for extension '%s' was not found", ext)
38
	}
39
	tracelog.DebugLogger.Printf("Found decompressor for %s", decompressor.FileExtension())
40

41
	archiveReader, exists, err := TryDownloadFile(reader, filename)
42
	if err != nil {
43
		return err
44
	}
45
	if !exists {
46
		return fmt.Errorf("file '%s' does not exist", filename)
47
	}
48
	defer utility.LoggedClose(archiveReader, "")
49

50
	decompressedReader, err := DecompressDecryptBytes(archiveReader, decompressor)
51
	if err != nil {
52
		return err
53
	}
54
	defer utility.LoggedClose(decompressedReader, "")
55

56
	_, err = utility.FastCopy(&utility.EmptyWriteIgnorer{Writer: writeCloser}, decompressedReader)
57
	return err
58
}
59

60
func TryDownloadFile(reader StorageFolderReader, path string) (fileReader io.ReadCloser, exists bool, err error) {
61
	fileReader, err = reader.ReadObject(path)
62
	if err == nil {
63
		exists = true
64
		return
65
	}
66
	if _, ok := errors.Cause(err).(storage.ObjectNotFoundError); ok {
67
		err = nil
68
	}
69
	return
70
}
71

72
func DecompressDecryptBytes(archiveReader io.Reader, decompressor compression.Decompressor) (io.ReadCloser, error) {
73
	decryptReader, err := DecryptBytes(archiveReader)
74
	if err != nil {
75
		return nil, err
76
	}
77
	if decompressor == nil {
78
		tracelog.DebugLogger.Printf("No decompressor has been selected")
79
		return io.NopCloser(decryptReader), nil
80
	}
81
	return decompressor.Decompress(decryptReader)
82
}
83

84
func DecryptBytes(archiveReader io.Reader) (io.Reader, error) {
85
	crypter := ConfigureCrypter()
86
	if crypter == nil {
87
		tracelog.DebugLogger.Printf("No crypter has been selected")
88
		return archiveReader, nil
89
	}
90

91
	tracelog.DebugLogger.Printf("Selected crypter: %s", crypter.Name())
92

93
	decryptReader, err := crypter.Decrypt(archiveReader)
94
	if err != nil {
95
		return nil, fmt.Errorf("failed to init decrypt reader: %w", err)
96
	}
97

98
	return decryptReader, nil
99
}
100

101
// CachedDecompressor is the file extension describing decompressor
102
type CachedDecompressor struct {
103
	FileExtension string
104
}
105

106
func GetLastDecompressor() (compression.Decompressor, error) {
107
	var cache CachedDecompressor
108
	var cacheFilename string
109

110
	usr, err := user.Current()
111
	if err == nil {
112
		cacheFilename = filepath.Join(usr.HomeDir, ".walg_decompressor_cache")
113
		file, err := os.ReadFile(cacheFilename)
114
		if err == nil {
115
			err = json.Unmarshal(file, &cache)
116
			if err != nil {
117
				return nil, err
118
			}
119
			return compression.FindDecompressor(cache.FileExtension), nil
120
		}
121
		return nil, err
122
	}
123

124
	return nil, nil
125
}
126

127
func SetLastDecompressor(decompressor compression.Decompressor) error {
128
	var cache CachedDecompressor
129
	usr, err := user.Current()
130

131
	if err != nil {
132
		return err
133
	}
134

135
	cacheFilename := filepath.Join(usr.HomeDir, ".walg_decompressor_cache")
136
	cache.FileExtension = decompressor.FileExtension()
137

138
	marshal, err := json.Marshal(&cache)
139
	if err == nil {
140
		return os.WriteFile(cacheFilename, marshal, 0644)
141
	}
142

143
	return err
144
}
145

146
func convertDecompressorList(decompressors []compression.Decompressor,
147
	lastDecompressor compression.Decompressor) []compression.Decompressor {
148
	ret := append(make([]compression.Decompressor, 0, len(decompressors)), lastDecompressor)
149

150
	for _, elem := range decompressors {
151
		if elem != lastDecompressor {
152
			ret = append(ret, elem)
153
		}
154
	}
155

156
	return ret
157
}
158

159
func putCachedDecompressorInFirstPlace(decompressors []compression.Decompressor) []compression.Decompressor {
160
	lastDecompressor, _ := GetLastDecompressor()
161

162
	if lastDecompressor != nil && lastDecompressor != decompressors[0] {
163
		return convertDecompressorList(decompressors, lastDecompressor)
164
	}
165

166
	return decompressors
167
}
168

169
// TODO : unit tests
170
func DownloadAndDecompressStorageFile(reader StorageFolderReader, fileName string) (io.ReadCloser, error) {
171
	archiveReader, decompressor, err := findDecompressorAndDownload(reader, fileName)
172
	if err != nil {
173
		return nil, err
174
	}
175

176
	decompressedReaded, err := DecompressDecryptBytes(archiveReader, decompressor)
177
	if err != nil {
178
		utility.LoggedClose(archiveReader, "")
179
		return nil, err
180
	}
181

182
	return ioextensions.ReadCascadeCloser{
183
		Reader: decompressedReaded,
184
		Closer: ioextensions.NewMultiCloser([]io.Closer{archiveReader, decompressedReaded}),
185
	}, nil
186
}
187

188
func findDecompressorAndDownload(reader StorageFolderReader, fileName string) (io.ReadCloser, compression.Decompressor, error) {
189
	for _, decompressor := range putCachedDecompressorInFirstPlace(compression.Decompressors) {
190
		archiveReader, exists, err := TryDownloadFile(reader, fileName+"."+decompressor.FileExtension())
191
		if err != nil {
192
			return nil, nil, err
193
		}
194
		if !exists {
195
			continue
196
		}
197
		_ = SetLastDecompressor(decompressor)
198

199
		return archiveReader, decompressor, nil
200
	}
201

202
	fileReader, exists, err := TryDownloadFile(reader, fileName)
203
	if err != nil {
204
		return nil, nil, err
205
	}
206
	if exists {
207
		return fileReader, nil, nil
208
	}
209

210
	return nil, nil, newArchiveNonExistenceError(fileName)
211
}
212

213
// TODO : unit tests
214
// DownloadFileTo downloads a file and writes it to local file
215
func DownloadFileTo(folderReader StorageFolderReader, fileName string, dstPath string) error {
216
	// Create file as soon as possible. It may be important due to race condition in wal-prefetch for PG.
217
	file, err := os.OpenFile(dstPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0666)
218
	if err != nil {
219
		return err
220
	}
221
	defer utility.LoggedClose(file, "")
222

223
	reader, err := DownloadAndDecompressStorageFile(folderReader, fileName)
224
	if err != nil {
225
		// We could not start upload - remove the file totally.
226
		_ = os.Remove(dstPath)
227
		return err
228
	}
229
	defer utility.LoggedClose(reader, "")
230

231
	_, err = utility.FastCopy(file, reader)
232
	// In case of error we may have some content within file. Leave it alone.
233
	return err
234
}
235

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.