11
"github.com/pkg/errors"
12
"github.com/wal-g/tracelog"
13
"github.com/wal-g/wal-g/internal/compression"
14
"github.com/wal-g/wal-g/internal/ioextensions"
15
"github.com/wal-g/wal-g/pkg/storages/storage"
16
"github.com/wal-g/wal-g/utility"
19
type ArchiveNonExistenceError struct {
23
func newArchiveNonExistenceError(archiveName string) ArchiveNonExistenceError {
24
return ArchiveNonExistenceError{errors.Errorf("Archive '%s' does not exist.\n", archiveName)}
27
func (err ArchiveNonExistenceError) Error() string {
28
return fmt.Sprintf(tracelog.GetErrorFormatter(), err.error)
31
// DownloadFile downloads, decompresses and decrypts
32
func DownloadFile(reader StorageFolderReader, filename, ext string, writeCloser io.WriteCloser) error {
33
utility.LoggedClose(writeCloser, "")
35
decompressor := compression.FindDecompressor(ext)
36
if decompressor == nil {
37
return fmt.Errorf("decompressor for extension '%s' was not found", ext)
39
tracelog.DebugLogger.Printf("Found decompressor for %s", decompressor.FileExtension())
41
archiveReader, exists, err := TryDownloadFile(reader, filename)
46
return fmt.Errorf("file '%s' does not exist", filename)
48
defer utility.LoggedClose(archiveReader, "")
50
decompressedReader, err := DecompressDecryptBytes(archiveReader, decompressor)
54
defer utility.LoggedClose(decompressedReader, "")
56
_, err = utility.FastCopy(&utility.EmptyWriteIgnorer{Writer: writeCloser}, decompressedReader)
60
func TryDownloadFile(reader StorageFolderReader, path string) (fileReader io.ReadCloser, exists bool, err error) {
61
fileReader, err = reader.ReadObject(path)
66
if _, ok := errors.Cause(err).(storage.ObjectNotFoundError); ok {
72
func DecompressDecryptBytes(archiveReader io.Reader, decompressor compression.Decompressor) (io.ReadCloser, error) {
73
decryptReader, err := DecryptBytes(archiveReader)
77
if decompressor == nil {
78
tracelog.DebugLogger.Printf("No decompressor has been selected")
79
return io.NopCloser(decryptReader), nil
81
return decompressor.Decompress(decryptReader)
84
func DecryptBytes(archiveReader io.Reader) (io.Reader, error) {
85
crypter := ConfigureCrypter()
87
tracelog.DebugLogger.Printf("No crypter has been selected")
88
return archiveReader, nil
91
tracelog.DebugLogger.Printf("Selected crypter: %s", crypter.Name())
93
decryptReader, err := crypter.Decrypt(archiveReader)
95
return nil, fmt.Errorf("failed to init decrypt reader: %w", err)
98
return decryptReader, nil
101
// CachedDecompressor is the file extension describing decompressor
102
type CachedDecompressor struct {
106
func GetLastDecompressor() (compression.Decompressor, error) {
107
var cache CachedDecompressor
108
var cacheFilename string
110
usr, err := user.Current()
112
cacheFilename = filepath.Join(usr.HomeDir, ".walg_decompressor_cache")
113
file, err := os.ReadFile(cacheFilename)
115
err = json.Unmarshal(file, &cache)
119
return compression.FindDecompressor(cache.FileExtension), nil
127
func SetLastDecompressor(decompressor compression.Decompressor) error {
128
var cache CachedDecompressor
129
usr, err := user.Current()
135
cacheFilename := filepath.Join(usr.HomeDir, ".walg_decompressor_cache")
136
cache.FileExtension = decompressor.FileExtension()
138
marshal, err := json.Marshal(&cache)
140
return os.WriteFile(cacheFilename, marshal, 0644)
146
func convertDecompressorList(decompressors []compression.Decompressor,
147
lastDecompressor compression.Decompressor) []compression.Decompressor {
148
ret := append(make([]compression.Decompressor, 0, len(decompressors)), lastDecompressor)
150
for _, elem := range decompressors {
151
if elem != lastDecompressor {
152
ret = append(ret, elem)
159
func putCachedDecompressorInFirstPlace(decompressors []compression.Decompressor) []compression.Decompressor {
160
lastDecompressor, _ := GetLastDecompressor()
162
if lastDecompressor != nil && lastDecompressor != decompressors[0] {
163
return convertDecompressorList(decompressors, lastDecompressor)
170
func DownloadAndDecompressStorageFile(reader StorageFolderReader, fileName string) (io.ReadCloser, error) {
171
archiveReader, decompressor, err := findDecompressorAndDownload(reader, fileName)
176
decompressedReaded, err := DecompressDecryptBytes(archiveReader, decompressor)
178
utility.LoggedClose(archiveReader, "")
182
return ioextensions.ReadCascadeCloser{
183
Reader: decompressedReaded,
184
Closer: ioextensions.NewMultiCloser([]io.Closer{archiveReader, decompressedReaded}),
188
func findDecompressorAndDownload(reader StorageFolderReader, fileName string) (io.ReadCloser, compression.Decompressor, error) {
189
for _, decompressor := range putCachedDecompressorInFirstPlace(compression.Decompressors) {
190
archiveReader, exists, err := TryDownloadFile(reader, fileName+"."+decompressor.FileExtension())
197
_ = SetLastDecompressor(decompressor)
199
return archiveReader, decompressor, nil
202
fileReader, exists, err := TryDownloadFile(reader, fileName)
207
return fileReader, nil, nil
210
return nil, nil, newArchiveNonExistenceError(fileName)
214
// DownloadFileTo downloads a file and writes it to local file
215
func DownloadFileTo(folderReader StorageFolderReader, fileName string, dstPath string) error {
216
// Create file as soon as possible. It may be important due to race condition in wal-prefetch for PG.
217
file, err := os.OpenFile(dstPath, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0666)
221
defer utility.LoggedClose(file, "")
223
reader, err := DownloadAndDecompressStorageFile(folderReader, fileName)
225
// We could not start upload - remove the file totally.
226
_ = os.Remove(dstPath)
229
defer utility.LoggedClose(reader, "")
231
_, err = utility.FastCopy(file, reader)
232
// In case of error we may have some content within file. Leave it alone.