1
// Copyright 2019+ Klaus Post. All rights reserved.
2
// License information can be found in the LICENSE file.
3
// Based on work by Yann Collet, released under BSD License.
11
rdebug "runtime/debug"
14
"github.com/klauspost/compress/zstd/internal/xxhash"
17
// Encoder provides encoding to Zstandard.
18
// An Encoder can be used for either compressing a stream via the
19
// io.WriteCloser interface supported by the Encoder or as multiple independent
20
// tasks via the EncodeAll function.
21
// Smaller encodes are encouraged to use the EncodeAll function.
22
// Use NewWriter to create a new instance.
30
type encoder interface {
31
Encode(blk *blockEnc, src []byte)
32
EncodeNoHist(blk *blockEnc, src []byte)
35
AppendCRC([]byte) []byte
36
WindowSize(size int64) int32
38
Reset(d *dict, singleBlock bool)
41
type encoderState struct {
52
frameContentSize int64
57
// This waitgroup indicates an encode is running.
59
// This waitgroup indicates we have a block encoding/writing.
63
// NewWriter will create a new Zstandard encoder.
64
// If the encoder will be used for encoding blocks a nil writer can be used.
65
func NewWriter(w io.Writer, opts ...EOption) (*Encoder, error) {
69
for _, o := range opts {
81
func (e *Encoder) initialize() {
82
if e.o.concurrent == 0 {
85
e.encoders = make(chan encoder, e.o.concurrent)
86
for i := 0; i < e.o.concurrent; i++ {
92
// Reset will re-initialize the writer and new writes will encode to the supplied writer
93
// as a new, independent stream.
94
func (e *Encoder) Reset(w io.Writer) {
98
if cap(s.filling) == 0 {
99
s.filling = make([]byte, 0, e.o.blockSize)
101
if e.o.concurrent > 1 {
102
if cap(s.current) == 0 {
103
s.current = make([]byte, 0, e.o.blockSize)
105
if cap(s.previous) == 0 {
106
s.previous = make([]byte, 0, e.o.blockSize)
108
s.current = s.current[:0]
109
s.previous = s.previous[:0]
110
if s.writing == nil {
111
s.writing = &blockEnc{lowMem: e.o.lowMem}
114
s.writing.initNewEncode()
116
if s.encoder == nil {
117
s.encoder = e.o.encoder()
119
s.filling = s.filling[:0]
120
s.encoder.Reset(e.o.dict, false)
121
s.headerWritten = false
123
s.fullFrameWritten = false
129
s.frameContentSize = 0
132
// ResetContentSize will reset and set a content size for the next stream.
133
// If the bytes written does not match the size given an error will be returned
134
// when calling Close().
135
// This is removed when Reset is called.
136
// Sizes <= 0 results in no content size set.
137
func (e *Encoder) ResetContentSize(w io.Writer, size int64) {
140
e.state.frameContentSize = size
144
// Write data to the encoder.
145
// Input data will be buffered and as the buffer fills up
146
// content will be compressed and written to the output.
147
// When done writing, use Close to flush the remaining output
148
// and write CRC if requested.
149
func (e *Encoder) Write(p []byte) (n int, err error) {
152
if len(p)+len(s.filling) < e.o.blockSize {
154
_, _ = s.encoder.CRC().Write(p)
156
s.filling = append(s.filling, p...)
157
return n + len(p), nil
160
if len(p)+len(s.filling) > e.o.blockSize {
161
add = add[:e.o.blockSize-len(s.filling)]
164
_, _ = s.encoder.CRC().Write(add)
166
s.filling = append(s.filling, add...)
169
if len(s.filling) < e.o.blockSize {
172
err := e.nextBlock(false)
176
if debugAsserts && len(s.filling) > 0 {
177
panic(len(s.filling))
183
// nextBlock will synchronize and start compressing input in e.state.filling.
184
// If an error has occurred during encoding it will be returned.
185
func (e *Encoder) nextBlock(final bool) error {
187
// Wait for current block.
192
if len(s.filling) > e.o.blockSize {
193
return fmt.Errorf("block > maxStoreBlockSize")
195
if !s.headerWritten {
196
// If we have a single block encode, do a sync compression.
197
if final && len(s.filling) == 0 && !e.o.fullZero {
198
s.headerWritten = true
199
s.fullFrameWritten = true
203
if final && len(s.filling) > 0 {
204
s.current = e.EncodeAll(s.filling, s.current[:0])
206
n2, s.err = s.w.Write(s.current)
210
s.nWritten += int64(n2)
211
s.nInput += int64(len(s.filling))
212
s.current = s.current[:0]
213
s.filling = s.filling[:0]
214
s.headerWritten = true
215
s.fullFrameWritten = true
220
var tmp [maxHeaderSize]byte
222
ContentSize: uint64(s.frameContentSize),
223
WindowSize: uint32(s.encoder.WindowSize(s.frameContentSize)),
224
SingleSegment: false,
226
DictID: e.o.dict.ID(),
229
dst, err := fh.appendTo(tmp[:0])
233
s.headerWritten = true
236
n2, s.err = s.w.Write(dst)
240
s.nWritten += int64(n2)
243
// Ensure we only write it once.
247
if len(s.filling) == 0 {
248
// Final block, but no data.
256
_, s.err = s.w.Write(blk.output)
257
s.nWritten += int64(len(blk.output))
264
if e.o.concurrent == 1 {
266
s.nInput += int64(len(s.filling))
268
println("Adding sync block,", len(src), "bytes, final:", final)
279
err := errIncompressible
280
// If we got the exact same number of literals as input,
281
// assume the literals cannot be compressed.
282
if len(src) != len(blk.literals) || len(src) != e.o.blockSize {
283
err = blk.encode(src, e.o.noEntropy, !e.o.allLitEntropy)
286
case errIncompressible:
288
println("Storing incompressible block as raw")
291
// In fast mode, we do not transfer offsets, so we don't have to deal with changing the.
297
_, s.err = s.w.Write(blk.output)
298
s.nWritten += int64(len(blk.output))
299
s.filling = s.filling[:0]
303
// Move blocks forward.
304
s.filling, s.current, s.previous = s.previous[:0], s.filling, s.current
305
s.nInput += int64(len(s.current))
307
go func(src []byte) {
309
println("Adding block,", len(src), "bytes, final:", final)
312
if r := recover(); r != nil {
313
s.err = fmt.Errorf("panic while encoding: %v", r)
325
// Wait for pending writes.
327
if s.writeErr != nil {
331
// Transfer encoders from previous write block.
332
blk.swapEncoders(s.writing)
333
// Transfer recent offsets to next.
334
enc.UseBlock(s.writing)
339
if r := recover(); r != nil {
340
s.writeErr = fmt.Errorf("panic while encoding/writing: %v", r)
345
err := errIncompressible
346
// If we got the exact same number of literals as input,
347
// assume the literals cannot be compressed.
348
if len(src) != len(blk.literals) || len(src) != e.o.blockSize {
349
err = blk.encode(src, e.o.noEntropy, !e.o.allLitEntropy)
352
case errIncompressible:
354
println("Storing incompressible block as raw")
357
// In fast mode, we do not transfer offsets, so we don't have to deal with changing the.
363
_, s.writeErr = s.w.Write(blk.output)
364
s.nWritten += int64(len(blk.output))
370
// ReadFrom reads data from r until EOF or error.
371
// The return value n is the number of bytes read.
372
// Any error except io.EOF encountered during the read is also returned.
374
// The Copy function uses ReaderFrom if available.
375
func (e *Encoder) ReadFrom(r io.Reader) (n int64, err error) {
377
println("Using ReadFrom")
380
// Flush any current writes.
381
if len(e.state.filling) > 0 {
382
if err := e.nextBlock(false); err != nil {
386
e.state.filling = e.state.filling[:e.o.blockSize]
387
src := e.state.filling
389
n2, err := r.Read(src)
391
_, _ = e.state.encoder.CRC().Write(src[:n2])
393
// src is now the unfilled part...
398
e.state.filling = e.state.filling[:len(e.state.filling)-len(src)]
400
println("ReadFrom: got EOF final block:", len(e.state.filling))
406
println("ReadFrom: got error:", err)
413
println("ReadFrom: got space left in source:", len(src))
417
err = e.nextBlock(false)
421
e.state.filling = e.state.filling[:e.o.blockSize]
422
src = e.state.filling
426
// Flush will send the currently written data to output
427
// and block until everything has been written.
428
// This should only be used on rare occasions where pushing the currently queued data is critical.
429
func (e *Encoder) Flush() error {
431
if len(s.filling) > 0 {
432
err := e.nextBlock(false)
445
// Close will flush the final output and close the stream.
446
// The function will block until everything has been written.
447
// The Encoder can still be re-used after calling this.
448
func (e *Encoder) Close() error {
450
if s.encoder == nil {
453
err := e.nextBlock(true)
457
if s.frameContentSize > 0 {
458
if s.nInput != s.frameContentSize {
459
return fmt.Errorf("frame content size %d given, but %d bytes was written", s.frameContentSize, s.nInput)
462
if e.state.fullFrameWritten {
471
if s.writeErr != nil {
476
if e.o.crc && s.err == nil {
479
_, s.err = s.w.Write(s.encoder.AppendCRC(tmp[:0]))
483
// Add padding with content from crypto/rand.Reader
484
if s.err == nil && e.o.pad > 0 {
485
add := calcSkippableFrame(s.nWritten, int64(e.o.pad))
486
frame, err := skippableFrame(s.filling[:0], add, rand.Reader)
490
_, s.err = s.w.Write(frame)
495
// EncodeAll will encode all input in src and append it to dst.
496
// This function can be called concurrently, but each call will only run on a single goroutine.
497
// If empty input is given, nothing is returned, unless WithZeroFrames is specified.
498
// Encoded blocks can be concatenated and the result will be the combined input stream.
499
// Data compressed with EncodeAll can be decoded with the Decoder,
500
// using either a stream or DecodeAll.
501
func (e *Encoder) EncodeAll(src, dst []byte) []byte {
507
WindowSize: MinWindowSize,
509
// Adding a checksum would be a waste of space.
513
dst, _ = fh.appendTo(dst)
515
// Write raw block as last one only.
518
blk.setType(blockTypeRaw)
520
dst = blk.appendTo(dst)
524
e.init.Do(e.initialize)
527
// Release encoder reference to last block.
528
// If a non-single block is needed the encoder will reset again.
531
// Use single segments when above minimum window and below 1MB.
532
single := len(src) < 1<<20 && len(src) > MinWindowSize
533
if e.o.single != nil {
537
ContentSize: uint64(len(src)),
538
WindowSize: uint32(enc.WindowSize(int64(len(src)))),
539
SingleSegment: single,
541
DictID: e.o.dict.ID(),
544
// If less than 1MB, allocate a buffer up front.
545
if len(dst) == 0 && cap(dst) == 0 && len(src) < 1<<20 && !e.o.lowMem {
546
dst = make([]byte, 0, len(src))
548
dst, err := fh.appendTo(dst)
553
// If we can do everything in one block, prefer that.
554
if len(src) <= maxCompressedBlockSize {
555
enc.Reset(e.o.dict, true)
556
// Slightly faster with no history and everything in one block.
558
_, _ = enc.CRC().Write(src)
563
enc.EncodeNoHist(blk, src)
568
// If we got the exact same number of literals as input,
569
// assume the literals cannot be compressed.
570
err := errIncompressible
572
if len(blk.literals) != len(src) || len(src) != e.o.blockSize {
573
// Output directly to dst
575
err = blk.encode(src, e.o.noEntropy, !e.o.allLitEntropy)
579
case errIncompressible:
581
println("Storing incompressible block as raw")
583
dst = blk.encodeRawTo(dst, src)
591
enc.Reset(e.o.dict, false)
595
if len(todo) > e.o.blockSize {
596
todo = todo[:e.o.blockSize]
598
src = src[len(todo):]
600
_, _ = enc.CRC().Write(todo)
603
enc.Encode(blk, todo)
607
err := errIncompressible
608
// If we got the exact same number of literals as input,
609
// assume the literals cannot be compressed.
610
if len(blk.literals) != len(todo) || len(todo) != e.o.blockSize {
611
err = blk.encode(todo, e.o.noEntropy, !e.o.allLitEntropy)
615
case errIncompressible:
617
println("Storing incompressible block as raw")
619
dst = blk.encodeRawTo(dst, todo)
622
dst = append(dst, blk.output...)
630
dst = enc.AppendCRC(dst)
632
// Add padding with content from crypto/rand.Reader
634
add := calcSkippableFrame(int64(len(dst)), int64(e.o.pad))
635
dst, err = skippableFrame(dst, add, rand.Reader)