wal-g
83 строки · 1.7 Кб
1package ioextensions
2
3import (
4"io"
5
6"github.com/wal-g/tracelog"
7)
8
9type ReaderWithRetry struct {
10reader io.ReadCloser
11getReader func() (io.ReadCloser, error)
12retryAttempts int
13alreadyRead int
14attempt int
15}
16
17func NewReaderWithRetry(getReader func() (io.ReadCloser, error), retryAttempts int) io.ReadCloser {
18if retryAttempts <= 0 {
19retryAttempts = 1
20}
21return &ReaderWithRetry{
22reader: nil,
23getReader: getReader,
24retryAttempts: retryAttempts,
25alreadyRead: 0,
26attempt: 0,
27}
28}
29
30func (r *ReaderWithRetry) setupNewReader() error {
31reader, err := r.getReader()
32if err != nil {
33return err
34}
35_, err = io.CopyN(io.Discard, reader, int64(r.alreadyRead))
36r.reader = reader
37return err
38}
39
40func (r *ReaderWithRetry) Read(p []byte) (int, error) {
41n := 0
42var lastErr error
43for r.attempt < r.retryAttempts {
44if r.reader == nil {
45err := r.setupNewReader()
46if err == io.EOF {
47return n, err
48} else if err != nil {
49tracelog.ErrorLogger.Printf("error while initializing reader: %v", err)
50tracelog.ErrorLogger.PrintOnError(r.reader.Close())
51r.reader = nil
52r.attempt++
53continue
54}
55}
56
57read, err := r.reader.Read(p[n:])
58lastErr = err
59n += read
60r.alreadyRead += read
61
62if err == io.EOF {
63return n, err
64} else if err != nil {
65tracelog.ErrorLogger.Printf("error while read file: %v. Attempt: %d\n", err, r.attempt)
66tracelog.ErrorLogger.PrintOnError(r.reader.Close())
67r.reader = nil
68r.attempt++
69continue
70} else if n == len(p) {
71return n, nil
72}
73}
74
75return n, lastErr
76}
77
78func (r *ReaderWithRetry) Close() error {
79if r.reader != nil {
80return r.reader.Close()
81}
82return nil
83}
84