cubefs

Форк
0
268 строк · 6.7 Кб
1
package sarama
2

3
import (
4
	"bufio"
5
	"fmt"
6
	"net"
7
	"regexp"
8
)
9

10
type none struct{}
11

12
// make []int32 sortable so we can sort partition numbers
13
type int32Slice []int32
14

15
func (slice int32Slice) Len() int {
16
	return len(slice)
17
}
18

19
func (slice int32Slice) Less(i, j int) bool {
20
	return slice[i] < slice[j]
21
}
22

23
func (slice int32Slice) Swap(i, j int) {
24
	slice[i], slice[j] = slice[j], slice[i]
25
}
26

27
func dupInt32Slice(input []int32) []int32 {
28
	ret := make([]int32, 0, len(input))
29
	ret = append(ret, input...)
30
	return ret
31
}
32

33
func withRecover(fn func()) {
34
	defer func() {
35
		handler := PanicHandler
36
		if handler != nil {
37
			if err := recover(); err != nil {
38
				handler(err)
39
			}
40
		}
41
	}()
42

43
	fn()
44
}
45

46
func safeAsyncClose(b *Broker) {
47
	tmp := b // local var prevents clobbering in goroutine
48
	go withRecover(func() {
49
		if connected, _ := tmp.Connected(); connected {
50
			if err := tmp.Close(); err != nil {
51
				Logger.Println("Error closing broker", tmp.ID(), ":", err)
52
			}
53
		}
54
	})
55
}
56

57
// Encoder is a simple interface for any type that can be encoded as an array of bytes
58
// in order to be sent as the key or value of a Kafka message. Length() is provided as an
59
// optimization, and must return the same as len() on the result of Encode().
60
type Encoder interface {
61
	Encode() ([]byte, error)
62
	Length() int
63
}
64

65
// make strings and byte slices encodable for convenience so they can be used as keys
66
// and/or values in kafka messages
67

68
// StringEncoder implements the Encoder interface for Go strings so that they can be used
69
// as the Key or Value in a ProducerMessage.
70
type StringEncoder string
71

72
func (s StringEncoder) Encode() ([]byte, error) {
73
	return []byte(s), nil
74
}
75

76
func (s StringEncoder) Length() int {
77
	return len(s)
78
}
79

80
// ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
81
// as the Key or Value in a ProducerMessage.
82
type ByteEncoder []byte
83

84
func (b ByteEncoder) Encode() ([]byte, error) {
85
	return b, nil
86
}
87

88
func (b ByteEncoder) Length() int {
89
	return len(b)
90
}
91

92
// bufConn wraps a net.Conn with a buffer for reads to reduce the number of
93
// reads that trigger syscalls.
94
type bufConn struct {
95
	net.Conn
96
	buf *bufio.Reader
97
}
98

99
func newBufConn(conn net.Conn) *bufConn {
100
	return &bufConn{
101
		Conn: conn,
102
		buf:  bufio.NewReader(conn),
103
	}
104
}
105

106
func (bc *bufConn) Read(b []byte) (n int, err error) {
107
	return bc.buf.Read(b)
108
}
109

110
// KafkaVersion instances represent versions of the upstream Kafka broker.
111
type KafkaVersion struct {
112
	// it's a struct rather than just typing the array directly to make it opaque and stop people
113
	// generating their own arbitrary versions
114
	version [4]uint
115
}
116

117
func newKafkaVersion(major, minor, veryMinor, patch uint) KafkaVersion {
118
	return KafkaVersion{
119
		version: [4]uint{major, minor, veryMinor, patch},
120
	}
121
}
122

123
// IsAtLeast return true if and only if the version it is called on is
124
// greater than or equal to the version passed in:
125
//    V1.IsAtLeast(V2) // false
126
//    V2.IsAtLeast(V1) // true
127
func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool {
128
	for i := range v.version {
129
		if v.version[i] > other.version[i] {
130
			return true
131
		} else if v.version[i] < other.version[i] {
132
			return false
133
		}
134
	}
135
	return true
136
}
137

138
// Effective constants defining the supported kafka versions.
139
var (
140
	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
141
	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
142
	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
143
	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
144
	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
145
	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
146
	V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
147
	V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
148
	V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
149
	V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
150
	V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
151
	V0_10_2_2 = newKafkaVersion(0, 10, 2, 2)
152
	V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
153
	V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
154
	V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
155
	V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
156
	V1_0_1_0  = newKafkaVersion(1, 0, 1, 0)
157
	V1_0_2_0  = newKafkaVersion(1, 0, 2, 0)
158
	V1_1_0_0  = newKafkaVersion(1, 1, 0, 0)
159
	V1_1_1_0  = newKafkaVersion(1, 1, 1, 0)
160
	V2_0_0_0  = newKafkaVersion(2, 0, 0, 0)
161
	V2_0_1_0  = newKafkaVersion(2, 0, 1, 0)
162
	V2_1_0_0  = newKafkaVersion(2, 1, 0, 0)
163
	V2_1_1_0  = newKafkaVersion(2, 1, 1, 0)
164
	V2_2_0_0  = newKafkaVersion(2, 2, 0, 0)
165
	V2_2_1_0  = newKafkaVersion(2, 2, 1, 0)
166
	V2_2_2_0  = newKafkaVersion(2, 2, 2, 0)
167
	V2_3_0_0  = newKafkaVersion(2, 3, 0, 0)
168
	V2_3_1_0  = newKafkaVersion(2, 3, 1, 0)
169
	V2_4_0_0  = newKafkaVersion(2, 4, 0, 0)
170
	V2_4_1_0  = newKafkaVersion(2, 4, 1, 0)
171
	V2_5_0_0  = newKafkaVersion(2, 5, 0, 0)
172
	V2_5_1_0  = newKafkaVersion(2, 5, 1, 0)
173
	V2_6_0_0  = newKafkaVersion(2, 6, 0, 0)
174
	V2_6_1_0  = newKafkaVersion(2, 6, 1, 0)
175
	V2_6_2_0  = newKafkaVersion(2, 6, 2, 0)
176
	V2_6_3_0  = newKafkaVersion(2, 6, 3, 0)
177
	V2_7_0_0  = newKafkaVersion(2, 7, 0, 0)
178
	V2_7_1_0  = newKafkaVersion(2, 7, 1, 0)
179
	V2_7_2_0  = newKafkaVersion(2, 7, 2, 0)
180
	V2_8_0_0  = newKafkaVersion(2, 8, 0, 0)
181
	V2_8_1_0  = newKafkaVersion(2, 8, 1, 0)
182
	V3_0_0_0  = newKafkaVersion(3, 0, 0, 0)
183
	V3_0_1_0  = newKafkaVersion(3, 0, 1, 0)
184
	V3_1_0_0  = newKafkaVersion(3, 1, 0, 0)
185

186
	SupportedVersions = []KafkaVersion{
187
		V0_8_2_0,
188
		V0_8_2_1,
189
		V0_8_2_2,
190
		V0_9_0_0,
191
		V0_9_0_1,
192
		V0_10_0_0,
193
		V0_10_0_1,
194
		V0_10_1_0,
195
		V0_10_1_1,
196
		V0_10_2_0,
197
		V0_10_2_1,
198
		V0_10_2_2,
199
		V0_11_0_0,
200
		V0_11_0_1,
201
		V0_11_0_2,
202
		V1_0_0_0,
203
		V1_0_1_0,
204
		V1_0_2_0,
205
		V1_1_0_0,
206
		V1_1_1_0,
207
		V2_0_0_0,
208
		V2_0_1_0,
209
		V2_1_0_0,
210
		V2_1_1_0,
211
		V2_2_0_0,
212
		V2_2_1_0,
213
		V2_2_2_0,
214
		V2_3_0_0,
215
		V2_3_1_0,
216
		V2_4_0_0,
217
		V2_4_1_0,
218
		V2_5_0_0,
219
		V2_5_1_0,
220
		V2_6_0_0,
221
		V2_6_1_0,
222
		V2_6_2_0,
223
		V2_7_0_0,
224
		V2_7_1_0,
225
		V2_8_0_0,
226
		V2_8_1_0,
227
		V3_0_0_0,
228
		V3_0_1_0,
229
		V3_1_0_0,
230
	}
231
	MinVersion     = V0_8_2_0
232
	MaxVersion     = V3_1_0_0
233
	DefaultVersion = V1_0_0_0
234
)
235

236
// ParseKafkaVersion parses and returns kafka version or error from a string
237
func ParseKafkaVersion(s string) (KafkaVersion, error) {
238
	if len(s) < 5 {
239
		return DefaultVersion, fmt.Errorf("invalid version `%s`", s)
240
	}
241
	var major, minor, veryMinor, patch uint
242
	var err error
243
	if s[0] == '0' {
244
		err = scanKafkaVersion(s, `^0\.\d+\.\d+\.\d+$`, "0.%d.%d.%d", [3]*uint{&minor, &veryMinor, &patch})
245
	} else {
246
		err = scanKafkaVersion(s, `^\d+\.\d+\.\d+$`, "%d.%d.%d", [3]*uint{&major, &minor, &veryMinor})
247
	}
248
	if err != nil {
249
		return DefaultVersion, err
250
	}
251
	return newKafkaVersion(major, minor, veryMinor, patch), nil
252
}
253

254
func scanKafkaVersion(s string, pattern string, format string, v [3]*uint) error {
255
	if !regexp.MustCompile(pattern).MatchString(s) {
256
		return fmt.Errorf("invalid version `%s`", s)
257
	}
258
	_, err := fmt.Sscanf(s, format, v[0], v[1], v[2])
259
	return err
260
}
261

262
func (v KafkaVersion) String() string {
263
	if v.version[0] == 0 {
264
		return fmt.Sprintf("0.%d.%d.%d", v.version[1], v.version[2], v.version[3])
265
	}
266

267
	return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
268
}
269

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.