podman
597 строк · 13.5 Кб
1// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Package pgzip implements reading and writing of gzip format compressed files,
6// as specified in RFC 1952.
7//
8// This is a drop in replacement for "compress/gzip".
9// This will split compression into blocks that are compressed in parallel.
10// This can be useful for compressing big amounts of data.
11// The gzip decompression has not been modified, but remains in the package,
12// so you can use it as a complete replacement for "compress/gzip".
13//
14// See more at https://github.com/klauspost/pgzip
15package pgzip
16
17import (
18"bufio"
19"errors"
20"hash"
21"hash/crc32"
22"io"
23"sync"
24"time"
25
26"github.com/klauspost/compress/flate"
27)
28
29const (
30gzipID1 = 0x1f
31gzipID2 = 0x8b
32gzipDeflate = 8
33flagText = 1 << 0
34flagHdrCrc = 1 << 1
35flagExtra = 1 << 2
36flagName = 1 << 3
37flagComment = 1 << 4
38)
39
40func makeReader(r io.Reader) flate.Reader {
41if rr, ok := r.(flate.Reader); ok {
42return rr
43}
44return bufio.NewReader(r)
45}
46
47var (
48// ErrChecksum is returned when reading GZIP data that has an invalid checksum.
49ErrChecksum = errors.New("gzip: invalid checksum")
50// ErrHeader is returned when reading GZIP data that has an invalid header.
51ErrHeader = errors.New("gzip: invalid header")
52)
53
54// The gzip file stores a header giving metadata about the compressed file.
55// That header is exposed as the fields of the Writer and Reader structs.
56type Header struct {
57Comment string // comment
58Extra []byte // "extra data"
59ModTime time.Time // modification time
60Name string // file name
61OS byte // operating system type
62}
63
64// A Reader is an io.Reader that can be read to retrieve
65// uncompressed data from a gzip-format compressed file.
66//
67// In general, a gzip file can be a concatenation of gzip files,
68// each with its own header. Reads from the Reader
69// return the concatenation of the uncompressed data of each.
70// Only the first header is recorded in the Reader fields.
71//
72// Gzip files store a length and checksum of the uncompressed data.
73// The Reader will return a ErrChecksum when Read
74// reaches the end of the uncompressed data if it does not
75// have the expected length or checksum. Clients should treat data
76// returned by Read as tentative until they receive the io.EOF
77// marking the end of the data.
78type Reader struct {
79Header
80r flate.Reader
81decompressor io.ReadCloser
82digest hash.Hash32
83size uint32
84flg byte
85buf [512]byte
86err error
87closeErr chan error
88multistream bool
89
90readAhead chan read
91roff int // read offset
92current []byte
93closeReader chan struct{}
94lastBlock bool
95blockSize int
96blocks int
97
98activeRA bool // Indication if readahead is active
99mu sync.Mutex // Lock for above
100
101blockPool chan []byte
102}
103
104type read struct {
105b []byte
106err error
107}
108
109// NewReader creates a new Reader reading the given reader.
110// The implementation buffers input and may read more data than necessary from r.
111// It is the caller's responsibility to call Close on the Reader when done.
112func NewReader(r io.Reader) (*Reader, error) {
113z := new(Reader)
114z.blocks = defaultBlocks
115z.blockSize = defaultBlockSize
116z.r = makeReader(r)
117z.digest = crc32.NewIEEE()
118z.multistream = true
119z.blockPool = make(chan []byte, z.blocks)
120for i := 0; i < z.blocks; i++ {
121z.blockPool <- make([]byte, z.blockSize)
122}
123if err := z.readHeader(true); err != nil {
124return nil, err
125}
126return z, nil
127}
128
129// NewReaderN creates a new Reader reading the given reader.
130// The implementation buffers input and may read more data than necessary from r.
131// It is the caller's responsibility to call Close on the Reader when done.
132//
133// With this you can control the approximate size of your blocks,
134// as well as how many blocks you want to have prefetched.
135//
136// Default values for this is blockSize = 250000, blocks = 16,
137// meaning up to 16 blocks of maximum 250000 bytes will be
138// prefetched.
139func NewReaderN(r io.Reader, blockSize, blocks int) (*Reader, error) {
140z := new(Reader)
141z.blocks = blocks
142z.blockSize = blockSize
143z.r = makeReader(r)
144z.digest = crc32.NewIEEE()
145z.multistream = true
146
147// Account for too small values
148if z.blocks <= 0 {
149z.blocks = defaultBlocks
150}
151if z.blockSize <= 512 {
152z.blockSize = defaultBlockSize
153}
154z.blockPool = make(chan []byte, z.blocks)
155for i := 0; i < z.blocks; i++ {
156z.blockPool <- make([]byte, z.blockSize)
157}
158if err := z.readHeader(true); err != nil {
159return nil, err
160}
161return z, nil
162}
163
164// Reset discards the Reader z's state and makes it equivalent to the
165// result of its original state from NewReader, but reading from r instead.
166// This permits reusing a Reader rather than allocating a new one.
167func (z *Reader) Reset(r io.Reader) error {
168z.killReadAhead()
169z.r = makeReader(r)
170z.digest = crc32.NewIEEE()
171z.size = 0
172z.err = nil
173z.multistream = true
174
175// Account for uninitialized values
176if z.blocks <= 0 {
177z.blocks = defaultBlocks
178}
179if z.blockSize <= 512 {
180z.blockSize = defaultBlockSize
181}
182
183if z.blockPool == nil {
184z.blockPool = make(chan []byte, z.blocks)
185for i := 0; i < z.blocks; i++ {
186z.blockPool <- make([]byte, z.blockSize)
187}
188}
189
190return z.readHeader(true)
191}
192
193// Multistream controls whether the reader supports multistream files.
194//
195// If enabled (the default), the Reader expects the input to be a sequence
196// of individually gzipped data streams, each with its own header and
197// trailer, ending at EOF. The effect is that the concatenation of a sequence
198// of gzipped files is treated as equivalent to the gzip of the concatenation
199// of the sequence. This is standard behavior for gzip readers.
200//
201// Calling Multistream(false) disables this behavior; disabling the behavior
202// can be useful when reading file formats that distinguish individual gzip
203// data streams or mix gzip data streams with other data streams.
204// In this mode, when the Reader reaches the end of the data stream,
205// Read returns io.EOF. If the underlying reader implements io.ByteReader,
206// it will be left positioned just after the gzip stream.
207// To start the next stream, call z.Reset(r) followed by z.Multistream(false).
208// If there is no next stream, z.Reset(r) will return io.EOF.
209func (z *Reader) Multistream(ok bool) {
210z.multistream = ok
211}
212
213// GZIP (RFC 1952) is little-endian, unlike ZLIB (RFC 1950).
214func get4(p []byte) uint32 {
215return uint32(p[0]) | uint32(p[1])<<8 | uint32(p[2])<<16 | uint32(p[3])<<24
216}
217
218func (z *Reader) readString() (string, error) {
219var err error
220needconv := false
221for i := 0; ; i++ {
222if i >= len(z.buf) {
223return "", ErrHeader
224}
225z.buf[i], err = z.r.ReadByte()
226if err != nil {
227return "", err
228}
229if z.buf[i] > 0x7f {
230needconv = true
231}
232if z.buf[i] == 0 {
233// GZIP (RFC 1952) specifies that strings are NUL-terminated ISO 8859-1 (Latin-1).
234if needconv {
235s := make([]rune, 0, i)
236for _, v := range z.buf[0:i] {
237s = append(s, rune(v))
238}
239return string(s), nil
240}
241return string(z.buf[0:i]), nil
242}
243}
244}
245
246func (z *Reader) read2() (uint32, error) {
247_, err := io.ReadFull(z.r, z.buf[0:2])
248if err != nil {
249return 0, err
250}
251return uint32(z.buf[0]) | uint32(z.buf[1])<<8, nil
252}
253
254func (z *Reader) readHeader(save bool) error {
255z.killReadAhead()
256
257_, err := io.ReadFull(z.r, z.buf[0:10])
258if err != nil {
259return err
260}
261if z.buf[0] != gzipID1 || z.buf[1] != gzipID2 || z.buf[2] != gzipDeflate {
262return ErrHeader
263}
264z.flg = z.buf[3]
265if save {
266z.ModTime = time.Unix(int64(get4(z.buf[4:8])), 0)
267// z.buf[8] is xfl, ignored
268z.OS = z.buf[9]
269}
270z.digest.Reset()
271z.digest.Write(z.buf[0:10])
272
273if z.flg&flagExtra != 0 {
274n, err := z.read2()
275if err != nil {
276return err
277}
278data := make([]byte, n)
279if _, err = io.ReadFull(z.r, data); err != nil {
280return err
281}
282if save {
283z.Extra = data
284}
285}
286
287var s string
288if z.flg&flagName != 0 {
289if s, err = z.readString(); err != nil {
290return err
291}
292if save {
293z.Name = s
294}
295}
296
297if z.flg&flagComment != 0 {
298if s, err = z.readString(); err != nil {
299return err
300}
301if save {
302z.Comment = s
303}
304}
305
306if z.flg&flagHdrCrc != 0 {
307n, err := z.read2()
308if err != nil {
309return err
310}
311sum := z.digest.Sum32() & 0xFFFF
312if n != sum {
313return ErrHeader
314}
315}
316
317z.digest.Reset()
318z.decompressor = flate.NewReader(z.r)
319z.doReadAhead()
320return nil
321}
322
323func (z *Reader) killReadAhead() error {
324z.mu.Lock()
325defer z.mu.Unlock()
326if z.activeRA {
327if z.closeReader != nil {
328close(z.closeReader)
329}
330
331// Wait for decompressor to be closed and return error, if any.
332e, ok := <-z.closeErr
333z.activeRA = false
334
335for blk := range z.readAhead {
336if blk.b != nil {
337z.blockPool <- blk.b
338}
339}
340if cap(z.current) > 0 {
341z.blockPool <- z.current
342z.current = nil
343}
344if !ok {
345// Channel is closed, so if there was any error it has already been returned.
346return nil
347}
348return e
349}
350return nil
351}
352
353// Starts readahead.
354// Will return on error (including io.EOF)
355// or when z.closeReader is closed.
356func (z *Reader) doReadAhead() {
357z.mu.Lock()
358defer z.mu.Unlock()
359z.activeRA = true
360
361if z.blocks <= 0 {
362z.blocks = defaultBlocks
363}
364if z.blockSize <= 512 {
365z.blockSize = defaultBlockSize
366}
367ra := make(chan read, z.blocks)
368z.readAhead = ra
369closeReader := make(chan struct{}, 0)
370z.closeReader = closeReader
371z.lastBlock = false
372closeErr := make(chan error, 1)
373z.closeErr = closeErr
374z.size = 0
375z.roff = 0
376z.current = nil
377decomp := z.decompressor
378
379go func() {
380defer func() {
381closeErr <- decomp.Close()
382close(closeErr)
383close(ra)
384}()
385
386// We hold a local reference to digest, since
387// it way be changed by reset.
388digest := z.digest
389var wg sync.WaitGroup
390for {
391var buf []byte
392select {
393case buf = <-z.blockPool:
394case <-closeReader:
395return
396}
397buf = buf[0:z.blockSize]
398// Try to fill the buffer
399n, err := io.ReadFull(decomp, buf)
400if err == io.ErrUnexpectedEOF {
401if n > 0 {
402err = nil
403} else {
404// If we got zero bytes, we need to establish if
405// we reached end of stream or truncated stream.
406_, err = decomp.Read([]byte{})
407if err == io.EOF {
408err = nil
409}
410}
411}
412if n < len(buf) {
413buf = buf[0:n]
414}
415wg.Wait()
416wg.Add(1)
417go func() {
418digest.Write(buf)
419wg.Done()
420}()
421z.size += uint32(n)
422
423// If we return any error, out digest must be ready
424if err != nil {
425wg.Wait()
426}
427select {
428case z.readAhead <- read{b: buf, err: err}:
429case <-closeReader:
430// Sent on close, we don't care about the next results
431z.blockPool <- buf
432return
433}
434if err != nil {
435return
436}
437}
438}()
439}
440
441func (z *Reader) Read(p []byte) (n int, err error) {
442if z.err != nil {
443return 0, z.err
444}
445if len(p) == 0 {
446return 0, nil
447}
448
449for {
450if len(z.current) == 0 && !z.lastBlock {
451read := <-z.readAhead
452
453if read.err != nil {
454// If not nil, the reader will have exited
455z.closeReader = nil
456
457if read.err != io.EOF {
458z.err = read.err
459return
460}
461if read.err == io.EOF {
462z.lastBlock = true
463err = nil
464}
465}
466z.current = read.b
467z.roff = 0
468}
469avail := z.current[z.roff:]
470if len(p) >= len(avail) {
471// If len(p) >= len(current), return all content of current
472n = copy(p, avail)
473z.blockPool <- z.current
474z.current = nil
475if z.lastBlock {
476err = io.EOF
477break
478}
479} else {
480// We copy as much as there is space for
481n = copy(p, avail)
482z.roff += n
483}
484return
485}
486
487// Finished file; check checksum + size.
488if _, err := io.ReadFull(z.r, z.buf[0:8]); err != nil {
489z.err = err
490return 0, err
491}
492crc32, isize := get4(z.buf[0:4]), get4(z.buf[4:8])
493sum := z.digest.Sum32()
494if sum != crc32 || isize != z.size {
495z.err = ErrChecksum
496return 0, z.err
497}
498
499// File is ok; should we attempt reading one more?
500if !z.multistream {
501return 0, io.EOF
502}
503
504// Is there another?
505if err = z.readHeader(false); err != nil {
506z.err = err
507return
508}
509
510// Yes. Reset and read from it.
511return z.Read(p)
512}
513
514func (z *Reader) WriteTo(w io.Writer) (n int64, err error) {
515total := int64(0)
516avail := z.current[z.roff:]
517if len(avail) != 0 {
518n, err := w.Write(avail)
519if n != len(avail) {
520return total, io.ErrShortWrite
521}
522total += int64(n)
523if err != nil {
524return total, err
525}
526z.blockPool <- z.current
527z.current = nil
528}
529for {
530if z.err != nil {
531return total, z.err
532}
533// We write both to output and digest.
534for {
535// Read from input
536read := <-z.readAhead
537if read.err != nil {
538// If not nil, the reader will have exited
539z.closeReader = nil
540
541if read.err != io.EOF {
542z.err = read.err
543return total, z.err
544}
545if read.err == io.EOF {
546z.lastBlock = true
547err = nil
548}
549}
550// Write what we got
551n, err := w.Write(read.b)
552if n != len(read.b) {
553return total, io.ErrShortWrite
554}
555total += int64(n)
556if err != nil {
557return total, err
558}
559// Put block back
560z.blockPool <- read.b
561if z.lastBlock {
562break
563}
564}
565
566// Finished file; check checksum + size.
567if _, err := io.ReadFull(z.r, z.buf[0:8]); err != nil {
568z.err = err
569return total, err
570}
571crc32, isize := get4(z.buf[0:4]), get4(z.buf[4:8])
572sum := z.digest.Sum32()
573if sum != crc32 || isize != z.size {
574z.err = ErrChecksum
575return total, z.err
576}
577// File is ok; should we attempt reading one more?
578if !z.multistream {
579return total, nil
580}
581
582// Is there another?
583err = z.readHeader(false)
584if err == io.EOF {
585return total, nil
586}
587if err != nil {
588z.err = err
589return total, err
590}
591}
592}
593
594// Close closes the Reader. It does not close the underlying io.Reader.
595func (z *Reader) Close() error {
596return z.killReadAhead()
597}
598