podman
519 строк · 12.9 Кб
1// Copyright 2010 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package pgzip
6
7import (
8"bytes"
9"errors"
10"fmt"
11"hash"
12"hash/crc32"
13"io"
14"runtime"
15"sync"
16"time"
17
18"github.com/klauspost/compress/flate"
19)
20
21const (
22defaultBlockSize = 1 << 20
23tailSize = 16384
24defaultBlocks = 4
25)
26
27// These constants are copied from the flate package, so that code that imports
28// "compress/gzip" does not also have to import "compress/flate".
29const (
30NoCompression = flate.NoCompression
31BestSpeed = flate.BestSpeed
32BestCompression = flate.BestCompression
33DefaultCompression = flate.DefaultCompression
34ConstantCompression = flate.ConstantCompression
35HuffmanOnly = flate.HuffmanOnly
36)
37
38// A Writer is an io.WriteCloser.
39// Writes to a Writer are compressed and written to w.
40type Writer struct {
41Header
42w io.Writer
43level int
44wroteHeader bool
45blockSize int
46blocks int
47currentBuffer []byte
48prevTail []byte
49digest hash.Hash32
50size int
51closed bool
52buf [10]byte
53errMu sync.RWMutex
54err error
55pushedErr chan struct{}
56results chan result
57dictFlatePool sync.Pool
58dstPool sync.Pool
59wg sync.WaitGroup
60}
61
62type result struct {
63result chan []byte
64notifyWritten chan struct{}
65}
66
67// Use SetConcurrency to finetune the concurrency level if needed.
68//
69// With this you can control the approximate size of your blocks,
70// as well as how many you want to be processing in parallel.
71//
72// Default values for this is SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)),
73// meaning blocks are split at 1 MB and up to the number of CPU threads
74// can be processing at once before the writer blocks.
75func (z *Writer) SetConcurrency(blockSize, blocks int) error {
76if blockSize <= tailSize {
77return fmt.Errorf("gzip: block size cannot be less than or equal to %d", tailSize)
78}
79if blocks <= 0 {
80return errors.New("gzip: blocks cannot be zero or less")
81}
82if blockSize == z.blockSize && blocks == z.blocks {
83return nil
84}
85z.blockSize = blockSize
86z.results = make(chan result, blocks)
87z.blocks = blocks
88z.dstPool.New = func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) }
89return nil
90}
91
92// NewWriter returns a new Writer.
93// Writes to the returned writer are compressed and written to w.
94//
95// It is the caller's responsibility to call Close on the WriteCloser when done.
96// Writes may be buffered and not flushed until Close.
97//
98// Callers that wish to set the fields in Writer.Header must do so before
99// the first call to Write or Close. The Comment and Name header fields are
100// UTF-8 strings in Go, but the underlying format requires NUL-terminated ISO
101// 8859-1 (Latin-1). NUL or non-Latin-1 runes in those strings will lead to an
102// error on Write.
103func NewWriter(w io.Writer) *Writer {
104z, _ := NewWriterLevel(w, DefaultCompression)
105return z
106}
107
108// NewWriterLevel is like NewWriter but specifies the compression level instead
109// of assuming DefaultCompression.
110//
111// The compression level can be DefaultCompression, NoCompression, or any
112// integer value between BestSpeed and BestCompression inclusive. The error
113// returned will be nil if the level is valid.
114func NewWriterLevel(w io.Writer, level int) (*Writer, error) {
115if level < ConstantCompression || level > BestCompression {
116return nil, fmt.Errorf("gzip: invalid compression level: %d", level)
117}
118z := new(Writer)
119z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
120z.init(w, level)
121return z, nil
122}
123
124// This function must be used by goroutines to set an
125// error condition, since z.err access is restricted
126// to the callers goruotine.
127func (z *Writer) pushError(err error) {
128z.errMu.Lock()
129if z.err != nil {
130z.errMu.Unlock()
131return
132}
133z.err = err
134close(z.pushedErr)
135z.errMu.Unlock()
136}
137
138func (z *Writer) init(w io.Writer, level int) {
139z.wg.Wait()
140digest := z.digest
141if digest != nil {
142digest.Reset()
143} else {
144digest = crc32.NewIEEE()
145}
146z.Header = Header{OS: 255}
147z.w = w
148z.level = level
149z.digest = digest
150z.pushedErr = make(chan struct{}, 0)
151z.results = make(chan result, z.blocks)
152z.err = nil
153z.closed = false
154z.Comment = ""
155z.Extra = nil
156z.ModTime = time.Time{}
157z.wroteHeader = false
158z.currentBuffer = nil
159z.buf = [10]byte{}
160z.prevTail = nil
161z.size = 0
162if z.dictFlatePool.New == nil {
163z.dictFlatePool.New = func() interface{} {
164f, _ := flate.NewWriterDict(w, level, nil)
165return f
166}
167}
168}
169
170// Reset discards the Writer z's state and makes it equivalent to the
171// result of its original state from NewWriter or NewWriterLevel, but
172// writing to w instead. This permits reusing a Writer rather than
173// allocating a new one.
174func (z *Writer) Reset(w io.Writer) {
175if z.results != nil && !z.closed {
176close(z.results)
177}
178z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
179z.init(w, z.level)
180}
181
182// GZIP (RFC 1952) is little-endian, unlike ZLIB (RFC 1950).
183func put2(p []byte, v uint16) {
184p[0] = uint8(v >> 0)
185p[1] = uint8(v >> 8)
186}
187
188func put4(p []byte, v uint32) {
189p[0] = uint8(v >> 0)
190p[1] = uint8(v >> 8)
191p[2] = uint8(v >> 16)
192p[3] = uint8(v >> 24)
193}
194
195// writeBytes writes a length-prefixed byte slice to z.w.
196func (z *Writer) writeBytes(b []byte) error {
197if len(b) > 0xffff {
198return errors.New("gzip.Write: Extra data is too large")
199}
200put2(z.buf[0:2], uint16(len(b)))
201_, err := z.w.Write(z.buf[0:2])
202if err != nil {
203return err
204}
205_, err = z.w.Write(b)
206return err
207}
208
209// writeString writes a UTF-8 string s in GZIP's format to z.w.
210// GZIP (RFC 1952) specifies that strings are NUL-terminated ISO 8859-1 (Latin-1).
211func (z *Writer) writeString(s string) (err error) {
212// GZIP stores Latin-1 strings; error if non-Latin-1; convert if non-ASCII.
213needconv := false
214for _, v := range s {
215if v == 0 || v > 0xff {
216return errors.New("gzip.Write: non-Latin-1 header string")
217}
218if v > 0x7f {
219needconv = true
220}
221}
222if needconv {
223b := make([]byte, 0, len(s))
224for _, v := range s {
225b = append(b, byte(v))
226}
227_, err = z.w.Write(b)
228} else {
229_, err = io.WriteString(z.w, s)
230}
231if err != nil {
232return err
233}
234// GZIP strings are NUL-terminated.
235z.buf[0] = 0
236_, err = z.w.Write(z.buf[0:1])
237return err
238}
239
240// compressCurrent will compress the data currently buffered
241// This should only be called from the main writer/flush/closer
242func (z *Writer) compressCurrent(flush bool) {
243c := z.currentBuffer
244if len(c) > z.blockSize {
245// This can never happen through the public interface.
246panic("len(z.currentBuffer) > z.blockSize (most likely due to concurrent Write race)")
247}
248
249r := result{}
250r.result = make(chan []byte, 1)
251r.notifyWritten = make(chan struct{}, 0)
252// Reserve a result slot
253select {
254case z.results <- r:
255case <-z.pushedErr:
256return
257}
258
259z.wg.Add(1)
260tail := z.prevTail
261if len(c) > tailSize {
262buf := z.dstPool.Get().([]byte) // Put in .compressBlock
263// Copy tail from current buffer before handing the buffer over to the
264// compressBlock goroutine.
265buf = append(buf[:0], c[len(c)-tailSize:]...)
266z.prevTail = buf
267} else {
268z.prevTail = nil
269}
270go z.compressBlock(c, tail, r, z.closed)
271
272z.currentBuffer = z.dstPool.Get().([]byte) // Put in .compressBlock
273z.currentBuffer = z.currentBuffer[:0]
274
275// Wait if flushing
276if flush {
277<-r.notifyWritten
278}
279}
280
281// Returns an error if it has been set.
282// Cannot be used by functions that are from internal goroutines.
283func (z *Writer) checkError() error {
284z.errMu.RLock()
285err := z.err
286z.errMu.RUnlock()
287return err
288}
289
290// Write writes a compressed form of p to the underlying io.Writer. The
291// compressed bytes are not necessarily flushed to output until
292// the Writer is closed or Flush() is called.
293//
294// The function will return quickly, if there are unused buffers.
295// The sent slice (p) is copied, and the caller is free to re-use the buffer
296// when the function returns.
297//
298// Errors that occur during compression will be reported later, and a nil error
299// does not signify that the compression succeeded (since it is most likely still running)
300// That means that the call that returns an error may not be the call that caused it.
301// Only Flush and Close functions are guaranteed to return any errors up to that point.
302func (z *Writer) Write(p []byte) (int, error) {
303if err := z.checkError(); err != nil {
304return 0, err
305}
306// Write the GZIP header lazily.
307if !z.wroteHeader {
308z.wroteHeader = true
309z.buf[0] = gzipID1
310z.buf[1] = gzipID2
311z.buf[2] = gzipDeflate
312z.buf[3] = 0
313if z.Extra != nil {
314z.buf[3] |= 0x04
315}
316if z.Name != "" {
317z.buf[3] |= 0x08
318}
319if z.Comment != "" {
320z.buf[3] |= 0x10
321}
322put4(z.buf[4:8], uint32(z.ModTime.Unix()))
323if z.level == BestCompression {
324z.buf[8] = 2
325} else if z.level == BestSpeed {
326z.buf[8] = 4
327} else {
328z.buf[8] = 0
329}
330z.buf[9] = z.OS
331var n int
332var err error
333n, err = z.w.Write(z.buf[0:10])
334if err != nil {
335z.pushError(err)
336return n, err
337}
338if z.Extra != nil {
339err = z.writeBytes(z.Extra)
340if err != nil {
341z.pushError(err)
342return n, err
343}
344}
345if z.Name != "" {
346err = z.writeString(z.Name)
347if err != nil {
348z.pushError(err)
349return n, err
350}
351}
352if z.Comment != "" {
353err = z.writeString(z.Comment)
354if err != nil {
355z.pushError(err)
356return n, err
357}
358}
359// Start receiving data from compressors
360go func() {
361listen := z.results
362var failed bool
363for {
364r, ok := <-listen
365// If closed, we are finished.
366if !ok {
367return
368}
369if failed {
370close(r.notifyWritten)
371continue
372}
373buf := <-r.result
374n, err := z.w.Write(buf)
375if err != nil {
376z.pushError(err)
377close(r.notifyWritten)
378failed = true
379continue
380}
381if n != len(buf) {
382z.pushError(fmt.Errorf("gzip: short write %d should be %d", n, len(buf)))
383failed = true
384close(r.notifyWritten)
385continue
386}
387z.dstPool.Put(buf)
388close(r.notifyWritten)
389}
390}()
391z.currentBuffer = z.dstPool.Get().([]byte)
392z.currentBuffer = z.currentBuffer[:0]
393}
394q := p
395for len(q) > 0 {
396length := len(q)
397if length+len(z.currentBuffer) > z.blockSize {
398length = z.blockSize - len(z.currentBuffer)
399}
400z.digest.Write(q[:length])
401z.currentBuffer = append(z.currentBuffer, q[:length]...)
402if len(z.currentBuffer) > z.blockSize {
403panic("z.currentBuffer too large (most likely due to concurrent Write race)")
404}
405if len(z.currentBuffer) == z.blockSize {
406z.compressCurrent(false)
407if err := z.checkError(); err != nil {
408return len(p) - len(q), err
409}
410}
411z.size += length
412q = q[length:]
413}
414return len(p), z.checkError()
415}
416
417// Step 1: compresses buffer to buffer
418// Step 2: send writer to channel
419// Step 3: Close result channel to indicate we are done
420func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) {
421defer func() {
422close(r.result)
423z.wg.Done()
424}()
425buf := z.dstPool.Get().([]byte) // Corresponding Put in .Write's result writer
426dest := bytes.NewBuffer(buf[:0])
427
428compressor := z.dictFlatePool.Get().(*flate.Writer) // Put below
429compressor.ResetDict(dest, prevTail)
430compressor.Write(p)
431z.dstPool.Put(p) // Corresponding Get in .Write and .compressCurrent
432
433err := compressor.Flush()
434if err != nil {
435z.pushError(err)
436return
437}
438if closed {
439err = compressor.Close()
440if err != nil {
441z.pushError(err)
442return
443}
444}
445z.dictFlatePool.Put(compressor) // Get above
446
447if prevTail != nil {
448z.dstPool.Put(prevTail) // Get in .compressCurrent
449}
450
451// Read back buffer
452buf = dest.Bytes()
453r.result <- buf
454}
455
456// Flush flushes any pending compressed data to the underlying writer.
457//
458// It is useful mainly in compressed network protocols, to ensure that
459// a remote reader has enough data to reconstruct a packet. Flush does
460// not return until the data has been written. If the underlying
461// writer returns an error, Flush returns that error.
462//
463// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.
464func (z *Writer) Flush() error {
465if err := z.checkError(); err != nil {
466return err
467}
468if z.closed {
469return nil
470}
471if !z.wroteHeader {
472_, err := z.Write(nil)
473if err != nil {
474return err
475}
476}
477// We send current block to compression
478z.compressCurrent(true)
479
480return z.checkError()
481}
482
483// UncompressedSize will return the number of bytes written.
484// pgzip only, not a function in the official gzip package.
485func (z *Writer) UncompressedSize() int {
486return z.size
487}
488
489// Close closes the Writer, flushing any unwritten data to the underlying
490// io.Writer, but does not close the underlying io.Writer.
491func (z *Writer) Close() error {
492if err := z.checkError(); err != nil {
493return err
494}
495if z.closed {
496return nil
497}
498
499z.closed = true
500if !z.wroteHeader {
501z.Write(nil)
502if err := z.checkError(); err != nil {
503return err
504}
505}
506z.compressCurrent(true)
507if err := z.checkError(); err != nil {
508return err
509}
510close(z.results)
511put4(z.buf[0:4], z.digest.Sum32())
512put4(z.buf[4:8], uint32(z.size))
513_, err := z.w.Write(z.buf[0:8])
514if err != nil {
515z.pushError(err)
516return err
517}
518return nil
519}
520