cubefs

Форк
0
/
consumer_group_members.go 
139 строк · 3.0 Кб
1
package sarama
2

3
import "errors"
4

5
// ConsumerGroupMemberMetadata holds the metadata for consumer group
6
// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
7
type ConsumerGroupMemberMetadata struct {
8
	Version         int16
9
	Topics          []string
10
	UserData        []byte
11
	OwnedPartitions []*OwnedPartition
12
}
13

14
func (m *ConsumerGroupMemberMetadata) encode(pe packetEncoder) error {
15
	pe.putInt16(m.Version)
16

17
	if err := pe.putStringArray(m.Topics); err != nil {
18
		return err
19
	}
20

21
	if err := pe.putBytes(m.UserData); err != nil {
22
		return err
23
	}
24

25
	return nil
26
}
27

28
func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
29
	if m.Version, err = pd.getInt16(); err != nil {
30
		return
31
	}
32

33
	if m.Topics, err = pd.getStringArray(); err != nil {
34
		return
35
	}
36

37
	if m.UserData, err = pd.getBytes(); err != nil {
38
		return
39
	}
40
	if m.Version >= 1 {
41
		n, err := pd.getArrayLength()
42
		if err != nil {
43
			// permit missing data here in case of misbehaving 3rd party
44
			// clients who incorrectly marked the member metadata as V1 in
45
			// their JoinGroup request
46
			if errors.Is(err, ErrInsufficientData) {
47
				return nil
48
			}
49
			return err
50
		}
51
		if n == 0 {
52
			return nil
53
		}
54
		m.OwnedPartitions = make([]*OwnedPartition, n)
55
		for i := 0; i < n; i++ {
56
			m.OwnedPartitions[i] = &OwnedPartition{}
57
			if err := m.OwnedPartitions[i].decode(pd); err != nil {
58
				return err
59
			}
60
		}
61
	}
62

63
	return nil
64
}
65

66
type OwnedPartition struct {
67
	Topic      string
68
	Partitions []int32
69
}
70

71
func (m *OwnedPartition) decode(pd packetDecoder) (err error) {
72
	if m.Topic, err = pd.getString(); err != nil {
73
		return err
74
	}
75
	if m.Partitions, err = pd.getInt32Array(); err != nil {
76
		return err
77
	}
78

79
	return nil
80
}
81

82
// ConsumerGroupMemberAssignment holds the member assignment for a consume group
83
// https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
84
type ConsumerGroupMemberAssignment struct {
85
	Version  int16
86
	Topics   map[string][]int32
87
	UserData []byte
88
}
89

90
func (m *ConsumerGroupMemberAssignment) encode(pe packetEncoder) error {
91
	pe.putInt16(m.Version)
92

93
	if err := pe.putArrayLength(len(m.Topics)); err != nil {
94
		return err
95
	}
96

97
	for topic, partitions := range m.Topics {
98
		if err := pe.putString(topic); err != nil {
99
			return err
100
		}
101
		if err := pe.putInt32Array(partitions); err != nil {
102
			return err
103
		}
104
	}
105

106
	if err := pe.putBytes(m.UserData); err != nil {
107
		return err
108
	}
109

110
	return nil
111
}
112

113
func (m *ConsumerGroupMemberAssignment) decode(pd packetDecoder) (err error) {
114
	if m.Version, err = pd.getInt16(); err != nil {
115
		return
116
	}
117

118
	var topicLen int
119
	if topicLen, err = pd.getArrayLength(); err != nil {
120
		return
121
	}
122

123
	m.Topics = make(map[string][]int32, topicLen)
124
	for i := 0; i < topicLen; i++ {
125
		var topic string
126
		if topic, err = pd.getString(); err != nil {
127
			return
128
		}
129
		if m.Topics[topic], err = pd.getInt32Array(); err != nil {
130
			return
131
		}
132
	}
133

134
	if m.UserData, err = pd.getBytes(); err != nil {
135
		return
136
	}
137

138
	return nil
139
}
140

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.