cubefs

Форк
0
194 строки · 4.5 Кб
1
package sarama
2

3
import (
4
	"bytes"
5
	"compress/gzip"
6
	"fmt"
7
	"sync"
8

9
	snappy "github.com/eapache/go-xerial-snappy"
10
	"github.com/pierrec/lz4"
11
)
12

13
var (
14
	lz4WriterPool = sync.Pool{
15
		New: func() interface{} {
16
			return lz4.NewWriter(nil)
17
		},
18
	}
19

20
	gzipWriterPool = sync.Pool{
21
		New: func() interface{} {
22
			return gzip.NewWriter(nil)
23
		},
24
	}
25
	gzipWriterPoolForCompressionLevel1 = sync.Pool{
26
		New: func() interface{} {
27
			gz, err := gzip.NewWriterLevel(nil, 1)
28
			if err != nil {
29
				panic(err)
30
			}
31
			return gz
32
		},
33
	}
34
	gzipWriterPoolForCompressionLevel2 = sync.Pool{
35
		New: func() interface{} {
36
			gz, err := gzip.NewWriterLevel(nil, 2)
37
			if err != nil {
38
				panic(err)
39
			}
40
			return gz
41
		},
42
	}
43
	gzipWriterPoolForCompressionLevel3 = sync.Pool{
44
		New: func() interface{} {
45
			gz, err := gzip.NewWriterLevel(nil, 3)
46
			if err != nil {
47
				panic(err)
48
			}
49
			return gz
50
		},
51
	}
52
	gzipWriterPoolForCompressionLevel4 = sync.Pool{
53
		New: func() interface{} {
54
			gz, err := gzip.NewWriterLevel(nil, 4)
55
			if err != nil {
56
				panic(err)
57
			}
58
			return gz
59
		},
60
	}
61
	gzipWriterPoolForCompressionLevel5 = sync.Pool{
62
		New: func() interface{} {
63
			gz, err := gzip.NewWriterLevel(nil, 5)
64
			if err != nil {
65
				panic(err)
66
			}
67
			return gz
68
		},
69
	}
70
	gzipWriterPoolForCompressionLevel6 = sync.Pool{
71
		New: func() interface{} {
72
			gz, err := gzip.NewWriterLevel(nil, 6)
73
			if err != nil {
74
				panic(err)
75
			}
76
			return gz
77
		},
78
	}
79
	gzipWriterPoolForCompressionLevel7 = sync.Pool{
80
		New: func() interface{} {
81
			gz, err := gzip.NewWriterLevel(nil, 7)
82
			if err != nil {
83
				panic(err)
84
			}
85
			return gz
86
		},
87
	}
88
	gzipWriterPoolForCompressionLevel8 = sync.Pool{
89
		New: func() interface{} {
90
			gz, err := gzip.NewWriterLevel(nil, 8)
91
			if err != nil {
92
				panic(err)
93
			}
94
			return gz
95
		},
96
	}
97
	gzipWriterPoolForCompressionLevel9 = sync.Pool{
98
		New: func() interface{} {
99
			gz, err := gzip.NewWriterLevel(nil, 9)
100
			if err != nil {
101
				panic(err)
102
			}
103
			return gz
104
		},
105
	}
106
)
107

108
func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
109
	switch cc {
110
	case CompressionNone:
111
		return data, nil
112
	case CompressionGZIP:
113
		var (
114
			err    error
115
			buf    bytes.Buffer
116
			writer *gzip.Writer
117
		)
118

119
		switch level {
120
		case CompressionLevelDefault:
121
			writer = gzipWriterPool.Get().(*gzip.Writer)
122
			defer gzipWriterPool.Put(writer)
123
			writer.Reset(&buf)
124
		case 1:
125
			writer = gzipWriterPoolForCompressionLevel1.Get().(*gzip.Writer)
126
			defer gzipWriterPoolForCompressionLevel1.Put(writer)
127
			writer.Reset(&buf)
128
		case 2:
129
			writer = gzipWriterPoolForCompressionLevel2.Get().(*gzip.Writer)
130
			defer gzipWriterPoolForCompressionLevel2.Put(writer)
131
			writer.Reset(&buf)
132
		case 3:
133
			writer = gzipWriterPoolForCompressionLevel3.Get().(*gzip.Writer)
134
			defer gzipWriterPoolForCompressionLevel3.Put(writer)
135
			writer.Reset(&buf)
136
		case 4:
137
			writer = gzipWriterPoolForCompressionLevel4.Get().(*gzip.Writer)
138
			defer gzipWriterPoolForCompressionLevel4.Put(writer)
139
			writer.Reset(&buf)
140
		case 5:
141
			writer = gzipWriterPoolForCompressionLevel5.Get().(*gzip.Writer)
142
			defer gzipWriterPoolForCompressionLevel5.Put(writer)
143
			writer.Reset(&buf)
144
		case 6:
145
			writer = gzipWriterPoolForCompressionLevel6.Get().(*gzip.Writer)
146
			defer gzipWriterPoolForCompressionLevel6.Put(writer)
147
			writer.Reset(&buf)
148
		case 7:
149
			writer = gzipWriterPoolForCompressionLevel7.Get().(*gzip.Writer)
150
			defer gzipWriterPoolForCompressionLevel7.Put(writer)
151
			writer.Reset(&buf)
152
		case 8:
153
			writer = gzipWriterPoolForCompressionLevel8.Get().(*gzip.Writer)
154
			defer gzipWriterPoolForCompressionLevel8.Put(writer)
155
			writer.Reset(&buf)
156
		case 9:
157
			writer = gzipWriterPoolForCompressionLevel9.Get().(*gzip.Writer)
158
			defer gzipWriterPoolForCompressionLevel9.Put(writer)
159
			writer.Reset(&buf)
160
		default:
161
			writer, err = gzip.NewWriterLevel(&buf, level)
162
			if err != nil {
163
				return nil, err
164
			}
165
		}
166
		if _, err := writer.Write(data); err != nil {
167
			return nil, err
168
		}
169
		if err := writer.Close(); err != nil {
170
			return nil, err
171
		}
172
		return buf.Bytes(), nil
173
	case CompressionSnappy:
174
		return snappy.Encode(data), nil
175
	case CompressionLZ4:
176
		writer := lz4WriterPool.Get().(*lz4.Writer)
177
		defer lz4WriterPool.Put(writer)
178

179
		var buf bytes.Buffer
180
		writer.Reset(&buf)
181

182
		if _, err := writer.Write(data); err != nil {
183
			return nil, err
184
		}
185
		if err := writer.Close(); err != nil {
186
			return nil, err
187
		}
188
		return buf.Bytes(), nil
189
	case CompressionZSTD:
190
		return zstdCompress(ZstdEncoderParams{level}, nil, data)
191
	default:
192
		return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
193
	}
194
}
195

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.