wal-g
51 строка · 1.1 Кб
1package splitmerge2
3import (4"context"5"io"6
7"github.com/wal-g/tracelog"8)
9
10func SplitReader(ctx context.Context, reader io.Reader, parts int, blockSize int) []io.Reader {11result := make([]io.Reader, 0)12channels := make([]chan []byte, 0)13
14for i := 0; i < parts; i++ {15channels = append(channels, make(chan []byte))16result = append(result, NewChannelReader(channels[i]))17}18
19// start SplitReader:20go func() {21idx := 022for {23block := make([]byte, blockSize)24bytes, err := io.ReadFull(reader, block)25if bytes != 0 {26select {27case channels[idx] <- block[0:bytes]:28case <-ctx.Done():29for i := 0; i < parts; i++ {30close(channels[i])31}32tracelog.InfoLogger.Println("SplitReader closed until the end of the work")33return34}35
36if bytes != blockSize {37tracelog.InfoLogger.Printf("SplitReader. #%d send: %d / %d bytes", idx, bytes, blockSize)38}39}40if err == io.ErrUnexpectedEOF || err == io.EOF {41for _, ch := range channels {42close(ch)43}44return45}46idx = (idx + 1) % len(channels)47}48}()49
50return result51}
52