cubefs

Форк
0
190 строк · 4.5 Кб
1
package sarama
2

3
import (
4
	"fmt"
5
	"time"
6
)
7

8
const (
9
	// CompressionNone no compression
10
	CompressionNone CompressionCodec = iota
11
	// CompressionGZIP compression using GZIP
12
	CompressionGZIP
13
	// CompressionSnappy compression using snappy
14
	CompressionSnappy
15
	// CompressionLZ4 compression using LZ4
16
	CompressionLZ4
17
	// CompressionZSTD compression using ZSTD
18
	CompressionZSTD
19

20
	// The lowest 3 bits contain the compression codec used for the message
21
	compressionCodecMask int8 = 0x07
22

23
	// Bit 3 set for "LogAppend" timestamps
24
	timestampTypeMask = 0x08
25

26
	// CompressionLevelDefault is the constant to use in CompressionLevel
27
	// to have the default compression level for any codec. The value is picked
28
	// that we don't use any existing compression levels.
29
	CompressionLevelDefault = -1000
30
)
31

32
// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
33
type CompressionCodec int8
34

35
func (cc CompressionCodec) String() string {
36
	return []string{
37
		"none",
38
		"gzip",
39
		"snappy",
40
		"lz4",
41
		"zstd",
42
	}[int(cc)]
43
}
44

45
// UnmarshalText returns a CompressionCodec from its string representation.
46
func (cc *CompressionCodec) UnmarshalText(text []byte) error {
47
	codecs := map[string]CompressionCodec{
48
		"none":   CompressionNone,
49
		"gzip":   CompressionGZIP,
50
		"snappy": CompressionSnappy,
51
		"lz4":    CompressionLZ4,
52
		"zstd":   CompressionZSTD,
53
	}
54
	codec, ok := codecs[string(text)]
55
	if !ok {
56
		return fmt.Errorf("cannot parse %q as a compression codec", string(text))
57
	}
58
	*cc = codec
59
	return nil
60
}
61

62
// MarshalText transforms a CompressionCodec into its string representation.
63
func (cc CompressionCodec) MarshalText() ([]byte, error) {
64
	return []byte(cc.String()), nil
65
}
66

67
// Message is a kafka message type
68
type Message struct {
69
	Codec            CompressionCodec // codec used to compress the message contents
70
	CompressionLevel int              // compression level
71
	LogAppendTime    bool             // the used timestamp is LogAppendTime
72
	Key              []byte           // the message key, may be nil
73
	Value            []byte           // the message contents
74
	Set              *MessageSet      // the message set a message might wrap
75
	Version          int8             // v1 requires Kafka 0.10
76
	Timestamp        time.Time        // the timestamp of the message (version 1+ only)
77

78
	compressedCache []byte
79
	compressedSize  int // used for computing the compression ratio metrics
80
}
81

82
func (m *Message) encode(pe packetEncoder) error {
83
	pe.push(newCRC32Field(crcIEEE))
84

85
	pe.putInt8(m.Version)
86

87
	attributes := int8(m.Codec) & compressionCodecMask
88
	if m.LogAppendTime {
89
		attributes |= timestampTypeMask
90
	}
91
	pe.putInt8(attributes)
92

93
	if m.Version >= 1 {
94
		if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
95
			return err
96
		}
97
	}
98

99
	err := pe.putBytes(m.Key)
100
	if err != nil {
101
		return err
102
	}
103

104
	var payload []byte
105

106
	if m.compressedCache != nil {
107
		payload = m.compressedCache
108
		m.compressedCache = nil
109
	} else if m.Value != nil {
110
		payload, err = compress(m.Codec, m.CompressionLevel, m.Value)
111
		if err != nil {
112
			return err
113
		}
114
		m.compressedCache = payload
115
		// Keep in mind the compressed payload size for metric gathering
116
		m.compressedSize = len(payload)
117
	}
118

119
	if err = pe.putBytes(payload); err != nil {
120
		return err
121
	}
122

123
	return pe.pop()
124
}
125

126
func (m *Message) decode(pd packetDecoder) (err error) {
127
	crc32Decoder := acquireCrc32Field(crcIEEE)
128
	defer releaseCrc32Field(crc32Decoder)
129

130
	err = pd.push(crc32Decoder)
131
	if err != nil {
132
		return err
133
	}
134

135
	m.Version, err = pd.getInt8()
136
	if err != nil {
137
		return err
138
	}
139

140
	if m.Version > 1 {
141
		return PacketDecodingError{fmt.Sprintf("unknown magic byte (%v)", m.Version)}
142
	}
143

144
	attribute, err := pd.getInt8()
145
	if err != nil {
146
		return err
147
	}
148
	m.Codec = CompressionCodec(attribute & compressionCodecMask)
149
	m.LogAppendTime = attribute&timestampTypeMask == timestampTypeMask
150

151
	if m.Version == 1 {
152
		if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
153
			return err
154
		}
155
	}
156

157
	m.Key, err = pd.getBytes()
158
	if err != nil {
159
		return err
160
	}
161

162
	m.Value, err = pd.getBytes()
163
	if err != nil {
164
		return err
165
	}
166

167
	// Required for deep equal assertion during tests but might be useful
168
	// for future metrics about the compression ratio in fetch requests
169
	m.compressedSize = len(m.Value)
170

171
	if m.Value != nil && m.Codec != CompressionNone {
172
		m.Value, err = decompress(m.Codec, m.Value)
173
		if err != nil {
174
			return err
175
		}
176

177
		if err := m.decodeSet(); err != nil {
178
			return err
179
		}
180
	}
181

182
	return pd.pop()
183
}
184

185
// decodes a message set from a previously encoded bulk-message
186
func (m *Message) decodeSet() (err error) {
187
	pd := realDecoder{raw: m.Value}
188
	m.Set = &MessageSet{}
189
	return m.Set.decode(&pd)
190
}
191

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.