podman

Форк
0
519 строк · 12.9 Кб
1
// Copyright 2010 The Go Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style
3
// license that can be found in the LICENSE file.
4

5
package pgzip
6

7
import (
8
	"bytes"
9
	"errors"
10
	"fmt"
11
	"hash"
12
	"hash/crc32"
13
	"io"
14
	"runtime"
15
	"sync"
16
	"time"
17

18
	"github.com/klauspost/compress/flate"
19
)
20

21
const (
22
	defaultBlockSize = 1 << 20
23
	tailSize         = 16384
24
	defaultBlocks    = 4
25
)
26

27
// These constants are copied from the flate package, so that code that imports
28
// "compress/gzip" does not also have to import "compress/flate".
29
const (
30
	NoCompression       = flate.NoCompression
31
	BestSpeed           = flate.BestSpeed
32
	BestCompression     = flate.BestCompression
33
	DefaultCompression  = flate.DefaultCompression
34
	ConstantCompression = flate.ConstantCompression
35
	HuffmanOnly         = flate.HuffmanOnly
36
)
37

38
// A Writer is an io.WriteCloser.
39
// Writes to a Writer are compressed and written to w.
40
type Writer struct {
41
	Header
42
	w             io.Writer
43
	level         int
44
	wroteHeader   bool
45
	blockSize     int
46
	blocks        int
47
	currentBuffer []byte
48
	prevTail      []byte
49
	digest        hash.Hash32
50
	size          int
51
	closed        bool
52
	buf           [10]byte
53
	errMu         sync.RWMutex
54
	err           error
55
	pushedErr     chan struct{}
56
	results       chan result
57
	dictFlatePool sync.Pool
58
	dstPool       sync.Pool
59
	wg            sync.WaitGroup
60
}
61

62
type result struct {
63
	result        chan []byte
64
	notifyWritten chan struct{}
65
}
66

67
// Use SetConcurrency to finetune the concurrency level if needed.
68
//
69
// With this you can control the approximate size of your blocks,
70
// as well as how many you want to be processing in parallel.
71
//
72
// Default values for this is SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0)),
73
// meaning blocks are split at 1 MB and up to the number of CPU threads
74
// can be processing at once before the writer blocks.
75
func (z *Writer) SetConcurrency(blockSize, blocks int) error {
76
	if blockSize <= tailSize {
77
		return fmt.Errorf("gzip: block size cannot be less than or equal to %d", tailSize)
78
	}
79
	if blocks <= 0 {
80
		return errors.New("gzip: blocks cannot be zero or less")
81
	}
82
	if blockSize == z.blockSize && blocks == z.blocks {
83
		return nil
84
	}
85
	z.blockSize = blockSize
86
	z.results = make(chan result, blocks)
87
	z.blocks = blocks
88
	z.dstPool.New = func() interface{} { return make([]byte, 0, blockSize+(blockSize)>>4) }
89
	return nil
90
}
91

92
// NewWriter returns a new Writer.
93
// Writes to the returned writer are compressed and written to w.
94
//
95
// It is the caller's responsibility to call Close on the WriteCloser when done.
96
// Writes may be buffered and not flushed until Close.
97
//
98
// Callers that wish to set the fields in Writer.Header must do so before
99
// the first call to Write or Close. The Comment and Name header fields are
100
// UTF-8 strings in Go, but the underlying format requires NUL-terminated ISO
101
// 8859-1 (Latin-1). NUL or non-Latin-1 runes in those strings will lead to an
102
// error on Write.
103
func NewWriter(w io.Writer) *Writer {
104
	z, _ := NewWriterLevel(w, DefaultCompression)
105
	return z
106
}
107

108
// NewWriterLevel is like NewWriter but specifies the compression level instead
109
// of assuming DefaultCompression.
110
//
111
// The compression level can be DefaultCompression, NoCompression, or any
112
// integer value between BestSpeed and BestCompression inclusive. The error
113
// returned will be nil if the level is valid.
114
func NewWriterLevel(w io.Writer, level int) (*Writer, error) {
115
	if level < ConstantCompression || level > BestCompression {
116
		return nil, fmt.Errorf("gzip: invalid compression level: %d", level)
117
	}
118
	z := new(Writer)
119
	z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
120
	z.init(w, level)
121
	return z, nil
122
}
123

124
// This function must be used by goroutines to set an
125
// error condition, since z.err access is restricted
126
// to the callers goruotine.
127
func (z *Writer) pushError(err error) {
128
	z.errMu.Lock()
129
	if z.err != nil {
130
		z.errMu.Unlock()
131
		return
132
	}
133
	z.err = err
134
	close(z.pushedErr)
135
	z.errMu.Unlock()
136
}
137

138
func (z *Writer) init(w io.Writer, level int) {
139
	z.wg.Wait()
140
	digest := z.digest
141
	if digest != nil {
142
		digest.Reset()
143
	} else {
144
		digest = crc32.NewIEEE()
145
	}
146
	z.Header = Header{OS: 255}
147
	z.w = w
148
	z.level = level
149
	z.digest = digest
150
	z.pushedErr = make(chan struct{}, 0)
151
	z.results = make(chan result, z.blocks)
152
	z.err = nil
153
	z.closed = false
154
	z.Comment = ""
155
	z.Extra = nil
156
	z.ModTime = time.Time{}
157
	z.wroteHeader = false
158
	z.currentBuffer = nil
159
	z.buf = [10]byte{}
160
	z.prevTail = nil
161
	z.size = 0
162
	if z.dictFlatePool.New == nil {
163
		z.dictFlatePool.New = func() interface{} {
164
			f, _ := flate.NewWriterDict(w, level, nil)
165
			return f
166
		}
167
	}
168
}
169

170
// Reset discards the Writer z's state and makes it equivalent to the
171
// result of its original state from NewWriter or NewWriterLevel, but
172
// writing to w instead. This permits reusing a Writer rather than
173
// allocating a new one.
174
func (z *Writer) Reset(w io.Writer) {
175
	if z.results != nil && !z.closed {
176
		close(z.results)
177
	}
178
	z.SetConcurrency(defaultBlockSize, runtime.GOMAXPROCS(0))
179
	z.init(w, z.level)
180
}
181

182
// GZIP (RFC 1952) is little-endian, unlike ZLIB (RFC 1950).
183
func put2(p []byte, v uint16) {
184
	p[0] = uint8(v >> 0)
185
	p[1] = uint8(v >> 8)
186
}
187

188
func put4(p []byte, v uint32) {
189
	p[0] = uint8(v >> 0)
190
	p[1] = uint8(v >> 8)
191
	p[2] = uint8(v >> 16)
192
	p[3] = uint8(v >> 24)
193
}
194

195
// writeBytes writes a length-prefixed byte slice to z.w.
196
func (z *Writer) writeBytes(b []byte) error {
197
	if len(b) > 0xffff {
198
		return errors.New("gzip.Write: Extra data is too large")
199
	}
200
	put2(z.buf[0:2], uint16(len(b)))
201
	_, err := z.w.Write(z.buf[0:2])
202
	if err != nil {
203
		return err
204
	}
205
	_, err = z.w.Write(b)
206
	return err
207
}
208

209
// writeString writes a UTF-8 string s in GZIP's format to z.w.
210
// GZIP (RFC 1952) specifies that strings are NUL-terminated ISO 8859-1 (Latin-1).
211
func (z *Writer) writeString(s string) (err error) {
212
	// GZIP stores Latin-1 strings; error if non-Latin-1; convert if non-ASCII.
213
	needconv := false
214
	for _, v := range s {
215
		if v == 0 || v > 0xff {
216
			return errors.New("gzip.Write: non-Latin-1 header string")
217
		}
218
		if v > 0x7f {
219
			needconv = true
220
		}
221
	}
222
	if needconv {
223
		b := make([]byte, 0, len(s))
224
		for _, v := range s {
225
			b = append(b, byte(v))
226
		}
227
		_, err = z.w.Write(b)
228
	} else {
229
		_, err = io.WriteString(z.w, s)
230
	}
231
	if err != nil {
232
		return err
233
	}
234
	// GZIP strings are NUL-terminated.
235
	z.buf[0] = 0
236
	_, err = z.w.Write(z.buf[0:1])
237
	return err
238
}
239

240
// compressCurrent will compress the data currently buffered
241
// This should only be called from the main writer/flush/closer
242
func (z *Writer) compressCurrent(flush bool) {
243
	c := z.currentBuffer
244
	if len(c) > z.blockSize {
245
		// This can never happen through the public interface.
246
		panic("len(z.currentBuffer) > z.blockSize (most likely due to concurrent Write race)")
247
	}
248

249
	r := result{}
250
	r.result = make(chan []byte, 1)
251
	r.notifyWritten = make(chan struct{}, 0)
252
	// Reserve a result slot
253
	select {
254
	case z.results <- r:
255
	case <-z.pushedErr:
256
		return
257
	}
258

259
	z.wg.Add(1)
260
	tail := z.prevTail
261
	if len(c) > tailSize {
262
		buf := z.dstPool.Get().([]byte) // Put in .compressBlock
263
		// Copy tail from current buffer before handing the buffer over to the
264
		// compressBlock goroutine.
265
		buf = append(buf[:0], c[len(c)-tailSize:]...)
266
		z.prevTail = buf
267
	} else {
268
		z.prevTail = nil
269
	}
270
	go z.compressBlock(c, tail, r, z.closed)
271

272
	z.currentBuffer = z.dstPool.Get().([]byte) // Put in .compressBlock
273
	z.currentBuffer = z.currentBuffer[:0]
274

275
	// Wait if flushing
276
	if flush {
277
		<-r.notifyWritten
278
	}
279
}
280

281
// Returns an error if it has been set.
282
// Cannot be used by functions that are from internal goroutines.
283
func (z *Writer) checkError() error {
284
	z.errMu.RLock()
285
	err := z.err
286
	z.errMu.RUnlock()
287
	return err
288
}
289

290
// Write writes a compressed form of p to the underlying io.Writer. The
291
// compressed bytes are not necessarily flushed to output until
292
// the Writer is closed or Flush() is called.
293
//
294
// The function will return quickly, if there are unused buffers.
295
// The sent slice (p) is copied, and the caller is free to re-use the buffer
296
// when the function returns.
297
//
298
// Errors that occur during compression will be reported later, and a nil error
299
// does not signify that the compression succeeded (since it is most likely still running)
300
// That means that the call that returns an error may not be the call that caused it.
301
// Only Flush and Close functions are guaranteed to return any errors up to that point.
302
func (z *Writer) Write(p []byte) (int, error) {
303
	if err := z.checkError(); err != nil {
304
		return 0, err
305
	}
306
	// Write the GZIP header lazily.
307
	if !z.wroteHeader {
308
		z.wroteHeader = true
309
		z.buf[0] = gzipID1
310
		z.buf[1] = gzipID2
311
		z.buf[2] = gzipDeflate
312
		z.buf[3] = 0
313
		if z.Extra != nil {
314
			z.buf[3] |= 0x04
315
		}
316
		if z.Name != "" {
317
			z.buf[3] |= 0x08
318
		}
319
		if z.Comment != "" {
320
			z.buf[3] |= 0x10
321
		}
322
		put4(z.buf[4:8], uint32(z.ModTime.Unix()))
323
		if z.level == BestCompression {
324
			z.buf[8] = 2
325
		} else if z.level == BestSpeed {
326
			z.buf[8] = 4
327
		} else {
328
			z.buf[8] = 0
329
		}
330
		z.buf[9] = z.OS
331
		var n int
332
		var err error
333
		n, err = z.w.Write(z.buf[0:10])
334
		if err != nil {
335
			z.pushError(err)
336
			return n, err
337
		}
338
		if z.Extra != nil {
339
			err = z.writeBytes(z.Extra)
340
			if err != nil {
341
				z.pushError(err)
342
				return n, err
343
			}
344
		}
345
		if z.Name != "" {
346
			err = z.writeString(z.Name)
347
			if err != nil {
348
				z.pushError(err)
349
				return n, err
350
			}
351
		}
352
		if z.Comment != "" {
353
			err = z.writeString(z.Comment)
354
			if err != nil {
355
				z.pushError(err)
356
				return n, err
357
			}
358
		}
359
		// Start receiving data from compressors
360
		go func() {
361
			listen := z.results
362
			var failed bool
363
			for {
364
				r, ok := <-listen
365
				// If closed, we are finished.
366
				if !ok {
367
					return
368
				}
369
				if failed {
370
					close(r.notifyWritten)
371
					continue
372
				}
373
				buf := <-r.result
374
				n, err := z.w.Write(buf)
375
				if err != nil {
376
					z.pushError(err)
377
					close(r.notifyWritten)
378
					failed = true
379
					continue
380
				}
381
				if n != len(buf) {
382
					z.pushError(fmt.Errorf("gzip: short write %d should be %d", n, len(buf)))
383
					failed = true
384
					close(r.notifyWritten)
385
					continue
386
				}
387
				z.dstPool.Put(buf)
388
				close(r.notifyWritten)
389
			}
390
		}()
391
		z.currentBuffer = z.dstPool.Get().([]byte)
392
		z.currentBuffer = z.currentBuffer[:0]
393
	}
394
	q := p
395
	for len(q) > 0 {
396
		length := len(q)
397
		if length+len(z.currentBuffer) > z.blockSize {
398
			length = z.blockSize - len(z.currentBuffer)
399
		}
400
		z.digest.Write(q[:length])
401
		z.currentBuffer = append(z.currentBuffer, q[:length]...)
402
		if len(z.currentBuffer) > z.blockSize {
403
			panic("z.currentBuffer too large (most likely due to concurrent Write race)")
404
		}
405
		if len(z.currentBuffer) == z.blockSize {
406
			z.compressCurrent(false)
407
			if err := z.checkError(); err != nil {
408
				return len(p) - len(q), err
409
			}
410
		}
411
		z.size += length
412
		q = q[length:]
413
	}
414
	return len(p), z.checkError()
415
}
416

417
// Step 1: compresses buffer to buffer
418
// Step 2: send writer to channel
419
// Step 3: Close result channel to indicate we are done
420
func (z *Writer) compressBlock(p, prevTail []byte, r result, closed bool) {
421
	defer func() {
422
		close(r.result)
423
		z.wg.Done()
424
	}()
425
	buf := z.dstPool.Get().([]byte) // Corresponding Put in .Write's result writer
426
	dest := bytes.NewBuffer(buf[:0])
427

428
	compressor := z.dictFlatePool.Get().(*flate.Writer) // Put below
429
	compressor.ResetDict(dest, prevTail)
430
	compressor.Write(p)
431
	z.dstPool.Put(p) // Corresponding Get in .Write and .compressCurrent
432

433
	err := compressor.Flush()
434
	if err != nil {
435
		z.pushError(err)
436
		return
437
	}
438
	if closed {
439
		err = compressor.Close()
440
		if err != nil {
441
			z.pushError(err)
442
			return
443
		}
444
	}
445
	z.dictFlatePool.Put(compressor) // Get above
446

447
	if prevTail != nil {
448
		z.dstPool.Put(prevTail) // Get in .compressCurrent
449
	}
450

451
	// Read back buffer
452
	buf = dest.Bytes()
453
	r.result <- buf
454
}
455

456
// Flush flushes any pending compressed data to the underlying writer.
457
//
458
// It is useful mainly in compressed network protocols, to ensure that
459
// a remote reader has enough data to reconstruct a packet. Flush does
460
// not return until the data has been written. If the underlying
461
// writer returns an error, Flush returns that error.
462
//
463
// In the terminology of the zlib library, Flush is equivalent to Z_SYNC_FLUSH.
464
func (z *Writer) Flush() error {
465
	if err := z.checkError(); err != nil {
466
		return err
467
	}
468
	if z.closed {
469
		return nil
470
	}
471
	if !z.wroteHeader {
472
		_, err := z.Write(nil)
473
		if err != nil {
474
			return err
475
		}
476
	}
477
	// We send current block to compression
478
	z.compressCurrent(true)
479

480
	return z.checkError()
481
}
482

483
// UncompressedSize will return the number of bytes written.
484
// pgzip only, not a function in the official gzip package.
485
func (z *Writer) UncompressedSize() int {
486
	return z.size
487
}
488

489
// Close closes the Writer, flushing any unwritten data to the underlying
490
// io.Writer, but does not close the underlying io.Writer.
491
func (z *Writer) Close() error {
492
	if err := z.checkError(); err != nil {
493
		return err
494
	}
495
	if z.closed {
496
		return nil
497
	}
498

499
	z.closed = true
500
	if !z.wroteHeader {
501
		z.Write(nil)
502
		if err := z.checkError(); err != nil {
503
			return err
504
		}
505
	}
506
	z.compressCurrent(true)
507
	if err := z.checkError(); err != nil {
508
		return err
509
	}
510
	close(z.results)
511
	put4(z.buf[0:4], z.digest.Sum32())
512
	put4(z.buf[4:8], uint32(z.size))
513
	_, err := z.w.Write(z.buf[0:8])
514
	if err != nil {
515
		z.pushError(err)
516
		return err
517
	}
518
	return nil
519
}
520

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.