wal-g

Форк
0
/
splitreader.go 
51 строка · 1.1 Кб
1
package splitmerge
2

3
import (
4
	"context"
5
	"io"
6

7
	"github.com/wal-g/tracelog"
8
)
9

10
func SplitReader(ctx context.Context, reader io.Reader, parts int, blockSize int) []io.Reader {
11
	result := make([]io.Reader, 0)
12
	channels := make([]chan []byte, 0)
13

14
	for i := 0; i < parts; i++ {
15
		channels = append(channels, make(chan []byte))
16
		result = append(result, NewChannelReader(channels[i]))
17
	}
18

19
	// start SplitReader:
20
	go func() {
21
		idx := 0
22
		for {
23
			block := make([]byte, blockSize)
24
			bytes, err := io.ReadFull(reader, block)
25
			if bytes != 0 {
26
				select {
27
				case channels[idx] <- block[0:bytes]:
28
				case <-ctx.Done():
29
					for i := 0; i < parts; i++ {
30
						close(channels[i])
31
					}
32
					tracelog.InfoLogger.Println("SplitReader closed until the end of the work")
33
					return
34
				}
35

36
				if bytes != blockSize {
37
					tracelog.InfoLogger.Printf("SplitReader. #%d send: %d / %d bytes", idx, bytes, blockSize)
38
				}
39
			}
40
			if err == io.ErrUnexpectedEOF || err == io.EOF {
41
				for _, ch := range channels {
42
					close(ch)
43
				}
44
				return
45
			}
46
			idx = (idx + 1) % len(channels)
47
		}
48
	}()
49

50
	return result
51
}
52

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.