cubefs
194 строки · 4.5 Кб
1package sarama
2
3import (
4"bytes"
5"compress/gzip"
6"fmt"
7"sync"
8
9snappy "github.com/eapache/go-xerial-snappy"
10"github.com/pierrec/lz4"
11)
12
13var (
14lz4WriterPool = sync.Pool{
15New: func() interface{} {
16return lz4.NewWriter(nil)
17},
18}
19
20gzipWriterPool = sync.Pool{
21New: func() interface{} {
22return gzip.NewWriter(nil)
23},
24}
25gzipWriterPoolForCompressionLevel1 = sync.Pool{
26New: func() interface{} {
27gz, err := gzip.NewWriterLevel(nil, 1)
28if err != nil {
29panic(err)
30}
31return gz
32},
33}
34gzipWriterPoolForCompressionLevel2 = sync.Pool{
35New: func() interface{} {
36gz, err := gzip.NewWriterLevel(nil, 2)
37if err != nil {
38panic(err)
39}
40return gz
41},
42}
43gzipWriterPoolForCompressionLevel3 = sync.Pool{
44New: func() interface{} {
45gz, err := gzip.NewWriterLevel(nil, 3)
46if err != nil {
47panic(err)
48}
49return gz
50},
51}
52gzipWriterPoolForCompressionLevel4 = sync.Pool{
53New: func() interface{} {
54gz, err := gzip.NewWriterLevel(nil, 4)
55if err != nil {
56panic(err)
57}
58return gz
59},
60}
61gzipWriterPoolForCompressionLevel5 = sync.Pool{
62New: func() interface{} {
63gz, err := gzip.NewWriterLevel(nil, 5)
64if err != nil {
65panic(err)
66}
67return gz
68},
69}
70gzipWriterPoolForCompressionLevel6 = sync.Pool{
71New: func() interface{} {
72gz, err := gzip.NewWriterLevel(nil, 6)
73if err != nil {
74panic(err)
75}
76return gz
77},
78}
79gzipWriterPoolForCompressionLevel7 = sync.Pool{
80New: func() interface{} {
81gz, err := gzip.NewWriterLevel(nil, 7)
82if err != nil {
83panic(err)
84}
85return gz
86},
87}
88gzipWriterPoolForCompressionLevel8 = sync.Pool{
89New: func() interface{} {
90gz, err := gzip.NewWriterLevel(nil, 8)
91if err != nil {
92panic(err)
93}
94return gz
95},
96}
97gzipWriterPoolForCompressionLevel9 = sync.Pool{
98New: func() interface{} {
99gz, err := gzip.NewWriterLevel(nil, 9)
100if err != nil {
101panic(err)
102}
103return gz
104},
105}
106)
107
108func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
109switch cc {
110case CompressionNone:
111return data, nil
112case CompressionGZIP:
113var (
114err error
115buf bytes.Buffer
116writer *gzip.Writer
117)
118
119switch level {
120case CompressionLevelDefault:
121writer = gzipWriterPool.Get().(*gzip.Writer)
122defer gzipWriterPool.Put(writer)
123writer.Reset(&buf)
124case 1:
125writer = gzipWriterPoolForCompressionLevel1.Get().(*gzip.Writer)
126defer gzipWriterPoolForCompressionLevel1.Put(writer)
127writer.Reset(&buf)
128case 2:
129writer = gzipWriterPoolForCompressionLevel2.Get().(*gzip.Writer)
130defer gzipWriterPoolForCompressionLevel2.Put(writer)
131writer.Reset(&buf)
132case 3:
133writer = gzipWriterPoolForCompressionLevel3.Get().(*gzip.Writer)
134defer gzipWriterPoolForCompressionLevel3.Put(writer)
135writer.Reset(&buf)
136case 4:
137writer = gzipWriterPoolForCompressionLevel4.Get().(*gzip.Writer)
138defer gzipWriterPoolForCompressionLevel4.Put(writer)
139writer.Reset(&buf)
140case 5:
141writer = gzipWriterPoolForCompressionLevel5.Get().(*gzip.Writer)
142defer gzipWriterPoolForCompressionLevel5.Put(writer)
143writer.Reset(&buf)
144case 6:
145writer = gzipWriterPoolForCompressionLevel6.Get().(*gzip.Writer)
146defer gzipWriterPoolForCompressionLevel6.Put(writer)
147writer.Reset(&buf)
148case 7:
149writer = gzipWriterPoolForCompressionLevel7.Get().(*gzip.Writer)
150defer gzipWriterPoolForCompressionLevel7.Put(writer)
151writer.Reset(&buf)
152case 8:
153writer = gzipWriterPoolForCompressionLevel8.Get().(*gzip.Writer)
154defer gzipWriterPoolForCompressionLevel8.Put(writer)
155writer.Reset(&buf)
156case 9:
157writer = gzipWriterPoolForCompressionLevel9.Get().(*gzip.Writer)
158defer gzipWriterPoolForCompressionLevel9.Put(writer)
159writer.Reset(&buf)
160default:
161writer, err = gzip.NewWriterLevel(&buf, level)
162if err != nil {
163return nil, err
164}
165}
166if _, err := writer.Write(data); err != nil {
167return nil, err
168}
169if err := writer.Close(); err != nil {
170return nil, err
171}
172return buf.Bytes(), nil
173case CompressionSnappy:
174return snappy.Encode(data), nil
175case CompressionLZ4:
176writer := lz4WriterPool.Get().(*lz4.Writer)
177defer lz4WriterPool.Put(writer)
178
179var buf bytes.Buffer
180writer.Reset(&buf)
181
182if _, err := writer.Write(data); err != nil {
183return nil, err
184}
185if err := writer.Close(); err != nil {
186return nil, err
187}
188return buf.Bytes(), nil
189case CompressionZSTD:
190return zstdCompress(ZstdEncoderParams{level}, nil, data)
191default:
192return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
193}
194}
195