podman

Форк
0
597 строк · 13.5 Кб
1
// Copyright 2009 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
// Package pgzip implements reading and writing of gzip format compressed files,
6
// as specified in RFC 1952.
7
//
8
// This is a drop in replacement for "compress/gzip".
9
// This will split compression into blocks that are compressed in parallel.
10
// This can be useful for compressing big amounts of data.
11
// The gzip decompression has not been modified, but remains in the package,
12
// so you can use it as a complete replacement for "compress/gzip".
13
//
14
// See more at https://github.com/klauspost/pgzip
15
package pgzip
16

17
import (
18
	"bufio"
19
	"errors"
20
	"hash"
21
	"hash/crc32"
22
	"io"
23
	"sync"
24
	"time"
25

26
	"github.com/klauspost/compress/flate"
27
)
28

29
const (
30
	gzipID1     = 0x1f
31
	gzipID2     = 0x8b
32
	gzipDeflate = 8
33
	flagText    = 1 << 0
34
	flagHdrCrc  = 1 << 1
35
	flagExtra   = 1 << 2
36
	flagName    = 1 << 3
37
	flagComment = 1 << 4
38
)
39

40
func makeReader(r io.Reader) flate.Reader {
41
	if rr, ok := r.(flate.Reader); ok {
42
		return rr
43
	}
44
	return bufio.NewReader(r)
45
}
46

47
var (
48
	// ErrChecksum is returned when reading GZIP data that has an invalid checksum.
49
	ErrChecksum = errors.New("gzip: invalid checksum")
50
	// ErrHeader is returned when reading GZIP data that has an invalid header.
51
	ErrHeader = errors.New("gzip: invalid header")
52
)
53

54
// The gzip file stores a header giving metadata about the compressed file.
55
// That header is exposed as the fields of the Writer and Reader structs.
56
type Header struct {
57
	Comment string    // comment
58
	Extra   []byte    // "extra data"
59
	ModTime time.Time // modification time
60
	Name    string    // file name
61
	OS      byte      // operating system type
62
}
63

64
// A Reader is an io.Reader that can be read to retrieve
65
// uncompressed data from a gzip-format compressed file.
66
//
67
// In general, a gzip file can be a concatenation of gzip files,
68
// each with its own header.  Reads from the Reader
69
// return the concatenation of the uncompressed data of each.
70
// Only the first header is recorded in the Reader fields.
71
//
72
// Gzip files store a length and checksum of the uncompressed data.
73
// The Reader will return a ErrChecksum when Read
74
// reaches the end of the uncompressed data if it does not
75
// have the expected length or checksum.  Clients should treat data
76
// returned by Read as tentative until they receive the io.EOF
77
// marking the end of the data.
78
type Reader struct {
79
	Header
80
	r            flate.Reader
81
	decompressor io.ReadCloser
82
	digest       hash.Hash32
83
	size         uint32
84
	flg          byte
85
	buf          [512]byte
86
	err          error
87
	closeErr     chan error
88
	multistream  bool
89

90
	readAhead   chan read
91
	roff        int // read offset
92
	current     []byte
93
	closeReader chan struct{}
94
	lastBlock   bool
95
	blockSize   int
96
	blocks      int
97

98
	activeRA bool       // Indication if readahead is active
99
	mu       sync.Mutex // Lock for above
100

101
	blockPool chan []byte
102
}
103

104
type read struct {
105
	b   []byte
106
	err error
107
}
108

109
// NewReader creates a new Reader reading the given reader.
110
// The implementation buffers input and may read more data than necessary from r.
111
// It is the caller's responsibility to call Close on the Reader when done.
112
func NewReader(r io.Reader) (*Reader, error) {
113
	z := new(Reader)
114
	z.blocks = defaultBlocks
115
	z.blockSize = defaultBlockSize
116
	z.r = makeReader(r)
117
	z.digest = crc32.NewIEEE()
118
	z.multistream = true
119
	z.blockPool = make(chan []byte, z.blocks)
120
	for i := 0; i < z.blocks; i++ {
121
		z.blockPool <- make([]byte, z.blockSize)
122
	}
123
	if err := z.readHeader(true); err != nil {
124
		return nil, err
125
	}
126
	return z, nil
127
}
128

129
// NewReaderN creates a new Reader reading the given reader.
130
// The implementation buffers input and may read more data than necessary from r.
131
// It is the caller's responsibility to call Close on the Reader when done.
132
//
133
// With this you can control the approximate size of your blocks,
134
// as well as how many blocks you want to have prefetched.
135
//
136
// Default values for this is blockSize = 250000, blocks = 16,
137
// meaning up to 16 blocks of maximum 250000 bytes will be
138
// prefetched.
139
func NewReaderN(r io.Reader, blockSize, blocks int) (*Reader, error) {
140
	z := new(Reader)
141
	z.blocks = blocks
142
	z.blockSize = blockSize
143
	z.r = makeReader(r)
144
	z.digest = crc32.NewIEEE()
145
	z.multistream = true
146

147
	// Account for too small values
148
	if z.blocks <= 0 {
149
		z.blocks = defaultBlocks
150
	}
151
	if z.blockSize <= 512 {
152
		z.blockSize = defaultBlockSize
153
	}
154
	z.blockPool = make(chan []byte, z.blocks)
155
	for i := 0; i < z.blocks; i++ {
156
		z.blockPool <- make([]byte, z.blockSize)
157
	}
158
	if err := z.readHeader(true); err != nil {
159
		return nil, err
160
	}
161
	return z, nil
162
}
163

164
// Reset discards the Reader z's state and makes it equivalent to the
165
// result of its original state from NewReader, but reading from r instead.
166
// This permits reusing a Reader rather than allocating a new one.
167
func (z *Reader) Reset(r io.Reader) error {
168
	z.killReadAhead()
169
	z.r = makeReader(r)
170
	z.digest = crc32.NewIEEE()
171
	z.size = 0
172
	z.err = nil
173
	z.multistream = true
174

175
	// Account for uninitialized values
176
	if z.blocks <= 0 {
177
		z.blocks = defaultBlocks
178
	}
179
	if z.blockSize <= 512 {
180
		z.blockSize = defaultBlockSize
181
	}
182

183
	if z.blockPool == nil {
184
		z.blockPool = make(chan []byte, z.blocks)
185
		for i := 0; i < z.blocks; i++ {
186
			z.blockPool <- make([]byte, z.blockSize)
187
		}
188
	}
189

190
	return z.readHeader(true)
191
}
192

193
// Multistream controls whether the reader supports multistream files.
194
//
195
// If enabled (the default), the Reader expects the input to be a sequence
196
// of individually gzipped data streams, each with its own header and
197
// trailer, ending at EOF. The effect is that the concatenation of a sequence
198
// of gzipped files is treated as equivalent to the gzip of the concatenation
199
// of the sequence. This is standard behavior for gzip readers.
200
//
201
// Calling Multistream(false) disables this behavior; disabling the behavior
202
// can be useful when reading file formats that distinguish individual gzip
203
// data streams or mix gzip data streams with other data streams.
204
// In this mode, when the Reader reaches the end of the data stream,
205
// Read returns io.EOF. If the underlying reader implements io.ByteReader,
206
// it will be left positioned just after the gzip stream.
207
// To start the next stream, call z.Reset(r) followed by z.Multistream(false).
208
// If there is no next stream, z.Reset(r) will return io.EOF.
209
func (z *Reader) Multistream(ok bool) {
210
	z.multistream = ok
211
}
212

213
// GZIP (RFC 1952) is little-endian, unlike ZLIB (RFC 1950).
214
func get4(p []byte) uint32 {
215
	return uint32(p[0]) | uint32(p[1])<<8 | uint32(p[2])<<16 | uint32(p[3])<<24
216
}
217

218
func (z *Reader) readString() (string, error) {
219
	var err error
220
	needconv := false
221
	for i := 0; ; i++ {
222
		if i >= len(z.buf) {
223
			return "", ErrHeader
224
		}
225
		z.buf[i], err = z.r.ReadByte()
226
		if err != nil {
227
			return "", err
228
		}
229
		if z.buf[i] > 0x7f {
230
			needconv = true
231
		}
232
		if z.buf[i] == 0 {
233
			// GZIP (RFC 1952) specifies that strings are NUL-terminated ISO 8859-1 (Latin-1).
234
			if needconv {
235
				s := make([]rune, 0, i)
236
				for _, v := range z.buf[0:i] {
237
					s = append(s, rune(v))
238
				}
239
				return string(s), nil
240
			}
241
			return string(z.buf[0:i]), nil
242
		}
243
	}
244
}
245

246
func (z *Reader) read2() (uint32, error) {
247
	_, err := io.ReadFull(z.r, z.buf[0:2])
248
	if err != nil {
249
		return 0, err
250
	}
251
	return uint32(z.buf[0]) | uint32(z.buf[1])<<8, nil
252
}
253

254
func (z *Reader) readHeader(save bool) error {
255
	z.killReadAhead()
256

257
	_, err := io.ReadFull(z.r, z.buf[0:10])
258
	if err != nil {
259
		return err
260
	}
261
	if z.buf[0] != gzipID1 || z.buf[1] != gzipID2 || z.buf[2] != gzipDeflate {
262
		return ErrHeader
263
	}
264
	z.flg = z.buf[3]
265
	if save {
266
		z.ModTime = time.Unix(int64(get4(z.buf[4:8])), 0)
267
		// z.buf[8] is xfl, ignored
268
		z.OS = z.buf[9]
269
	}
270
	z.digest.Reset()
271
	z.digest.Write(z.buf[0:10])
272

273
	if z.flg&flagExtra != 0 {
274
		n, err := z.read2()
275
		if err != nil {
276
			return err
277
		}
278
		data := make([]byte, n)
279
		if _, err = io.ReadFull(z.r, data); err != nil {
280
			return err
281
		}
282
		if save {
283
			z.Extra = data
284
		}
285
	}
286

287
	var s string
288
	if z.flg&flagName != 0 {
289
		if s, err = z.readString(); err != nil {
290
			return err
291
		}
292
		if save {
293
			z.Name = s
294
		}
295
	}
296

297
	if z.flg&flagComment != 0 {
298
		if s, err = z.readString(); err != nil {
299
			return err
300
		}
301
		if save {
302
			z.Comment = s
303
		}
304
	}
305

306
	if z.flg&flagHdrCrc != 0 {
307
		n, err := z.read2()
308
		if err != nil {
309
			return err
310
		}
311
		sum := z.digest.Sum32() & 0xFFFF
312
		if n != sum {
313
			return ErrHeader
314
		}
315
	}
316

317
	z.digest.Reset()
318
	z.decompressor = flate.NewReader(z.r)
319
	z.doReadAhead()
320
	return nil
321
}
322

323
func (z *Reader) killReadAhead() error {
324
	z.mu.Lock()
325
	defer z.mu.Unlock()
326
	if z.activeRA {
327
		if z.closeReader != nil {
328
			close(z.closeReader)
329
		}
330

331
		// Wait for decompressor to be closed and return error, if any.
332
		e, ok := <-z.closeErr
333
		z.activeRA = false
334

335
		for blk := range z.readAhead {
336
			if blk.b != nil {
337
				z.blockPool <- blk.b
338
			}
339
		}
340
		if cap(z.current) > 0 {
341
			z.blockPool <- z.current
342
			z.current = nil
343
		}
344
		if !ok {
345
			// Channel is closed, so if there was any error it has already been returned.
346
			return nil
347
		}
348
		return e
349
	}
350
	return nil
351
}
352

353
// Starts readahead.
354
// Will return on error (including io.EOF)
355
// or when z.closeReader is closed.
356
func (z *Reader) doReadAhead() {
357
	z.mu.Lock()
358
	defer z.mu.Unlock()
359
	z.activeRA = true
360

361
	if z.blocks <= 0 {
362
		z.blocks = defaultBlocks
363
	}
364
	if z.blockSize <= 512 {
365
		z.blockSize = defaultBlockSize
366
	}
367
	ra := make(chan read, z.blocks)
368
	z.readAhead = ra
369
	closeReader := make(chan struct{}, 0)
370
	z.closeReader = closeReader
371
	z.lastBlock = false
372
	closeErr := make(chan error, 1)
373
	z.closeErr = closeErr
374
	z.size = 0
375
	z.roff = 0
376
	z.current = nil
377
	decomp := z.decompressor
378

379
	go func() {
380
		defer func() {
381
			closeErr <- decomp.Close()
382
			close(closeErr)
383
			close(ra)
384
		}()
385

386
		// We hold a local reference to digest, since
387
		// it way be changed by reset.
388
		digest := z.digest
389
		var wg sync.WaitGroup
390
		for {
391
			var buf []byte
392
			select {
393
			case buf = <-z.blockPool:
394
			case <-closeReader:
395
				return
396
			}
397
			buf = buf[0:z.blockSize]
398
			// Try to fill the buffer
399
			n, err := io.ReadFull(decomp, buf)
400
			if err == io.ErrUnexpectedEOF {
401
				if n > 0 {
402
					err = nil
403
				} else {
404
					// If we got zero bytes, we need to establish if
405
					// we reached end of stream or truncated stream.
406
					_, err = decomp.Read([]byte{})
407
					if err == io.EOF {
408
						err = nil
409
					}
410
				}
411
			}
412
			if n < len(buf) {
413
				buf = buf[0:n]
414
			}
415
			wg.Wait()
416
			wg.Add(1)
417
			go func() {
418
				digest.Write(buf)
419
				wg.Done()
420
			}()
421
			z.size += uint32(n)
422

423
			// If we return any error, out digest must be ready
424
			if err != nil {
425
				wg.Wait()
426
			}
427
			select {
428
			case z.readAhead <- read{b: buf, err: err}:
429
			case <-closeReader:
430
				// Sent on close, we don't care about the next results
431
				z.blockPool <- buf
432
				return
433
			}
434
			if err != nil {
435
				return
436
			}
437
		}
438
	}()
439
}
440

441
func (z *Reader) Read(p []byte) (n int, err error) {
442
	if z.err != nil {
443
		return 0, z.err
444
	}
445
	if len(p) == 0 {
446
		return 0, nil
447
	}
448

449
	for {
450
		if len(z.current) == 0 && !z.lastBlock {
451
			read := <-z.readAhead
452

453
			if read.err != nil {
454
				// If not nil, the reader will have exited
455
				z.closeReader = nil
456

457
				if read.err != io.EOF {
458
					z.err = read.err
459
					return
460
				}
461
				if read.err == io.EOF {
462
					z.lastBlock = true
463
					err = nil
464
				}
465
			}
466
			z.current = read.b
467
			z.roff = 0
468
		}
469
		avail := z.current[z.roff:]
470
		if len(p) >= len(avail) {
471
			// If len(p) >= len(current), return all content of current
472
			n = copy(p, avail)
473
			z.blockPool <- z.current
474
			z.current = nil
475
			if z.lastBlock {
476
				err = io.EOF
477
				break
478
			}
479
		} else {
480
			// We copy as much as there is space for
481
			n = copy(p, avail)
482
			z.roff += n
483
		}
484
		return
485
	}
486

487
	// Finished file; check checksum + size.
488
	if _, err := io.ReadFull(z.r, z.buf[0:8]); err != nil {
489
		z.err = err
490
		return 0, err
491
	}
492
	crc32, isize := get4(z.buf[0:4]), get4(z.buf[4:8])
493
	sum := z.digest.Sum32()
494
	if sum != crc32 || isize != z.size {
495
		z.err = ErrChecksum
496
		return 0, z.err
497
	}
498

499
	// File is ok; should we attempt reading one more?
500
	if !z.multistream {
501
		return 0, io.EOF
502
	}
503

504
	// Is there another?
505
	if err = z.readHeader(false); err != nil {
506
		z.err = err
507
		return
508
	}
509

510
	// Yes.  Reset and read from it.
511
	return z.Read(p)
512
}
513

514
func (z *Reader) WriteTo(w io.Writer) (n int64, err error) {
515
	total := int64(0)
516
	avail := z.current[z.roff:]
517
	if len(avail) != 0 {
518
		n, err := w.Write(avail)
519
		if n != len(avail) {
520
			return total, io.ErrShortWrite
521
		}
522
		total += int64(n)
523
		if err != nil {
524
			return total, err
525
		}
526
		z.blockPool <- z.current
527
		z.current = nil
528
	}
529
	for {
530
		if z.err != nil {
531
			return total, z.err
532
		}
533
		// We write both to output and digest.
534
		for {
535
			// Read from input
536
			read := <-z.readAhead
537
			if read.err != nil {
538
				// If not nil, the reader will have exited
539
				z.closeReader = nil
540

541
				if read.err != io.EOF {
542
					z.err = read.err
543
					return total, z.err
544
				}
545
				if read.err == io.EOF {
546
					z.lastBlock = true
547
					err = nil
548
				}
549
			}
550
			// Write what we got
551
			n, err := w.Write(read.b)
552
			if n != len(read.b) {
553
				return total, io.ErrShortWrite
554
			}
555
			total += int64(n)
556
			if err != nil {
557
				return total, err
558
			}
559
			// Put block back
560
			z.blockPool <- read.b
561
			if z.lastBlock {
562
				break
563
			}
564
		}
565

566
		// Finished file; check checksum + size.
567
		if _, err := io.ReadFull(z.r, z.buf[0:8]); err != nil {
568
			z.err = err
569
			return total, err
570
		}
571
		crc32, isize := get4(z.buf[0:4]), get4(z.buf[4:8])
572
		sum := z.digest.Sum32()
573
		if sum != crc32 || isize != z.size {
574
			z.err = ErrChecksum
575
			return total, z.err
576
		}
577
		// File is ok; should we attempt reading one more?
578
		if !z.multistream {
579
			return total, nil
580
		}
581

582
		// Is there another?
583
		err = z.readHeader(false)
584
		if err == io.EOF {
585
			return total, nil
586
		}
587
		if err != nil {
588
			z.err = err
589
			return total, err
590
		}
591
	}
592
}
593

594
// Close closes the Reader. It does not close the underlying io.Reader.
595
func (z *Reader) Close() error {
596
	return z.killReadAhead()
597
}
598

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.