cubefs

Форк
0
/
sync_group_request.go 
104 строки · 2.0 Кб
1
package sarama
2

3
type SyncGroupRequest struct {
4
	GroupId          string
5
	GenerationId     int32
6
	MemberId         string
7
	GroupAssignments map[string][]byte
8
}
9

10
func (r *SyncGroupRequest) encode(pe packetEncoder) error {
11
	if err := pe.putString(r.GroupId); err != nil {
12
		return err
13
	}
14

15
	pe.putInt32(r.GenerationId)
16

17
	if err := pe.putString(r.MemberId); err != nil {
18
		return err
19
	}
20

21
	if err := pe.putArrayLength(len(r.GroupAssignments)); err != nil {
22
		return err
23
	}
24
	for memberId, memberAssignment := range r.GroupAssignments {
25
		if err := pe.putString(memberId); err != nil {
26
			return err
27
		}
28
		if err := pe.putBytes(memberAssignment); err != nil {
29
			return err
30
		}
31
	}
32

33
	return nil
34
}
35

36
func (r *SyncGroupRequest) decode(pd packetDecoder, version int16) (err error) {
37
	if r.GroupId, err = pd.getString(); err != nil {
38
		return
39
	}
40
	if r.GenerationId, err = pd.getInt32(); err != nil {
41
		return
42
	}
43
	if r.MemberId, err = pd.getString(); err != nil {
44
		return
45
	}
46

47
	n, err := pd.getArrayLength()
48
	if err != nil {
49
		return err
50
	}
51
	if n == 0 {
52
		return nil
53
	}
54

55
	r.GroupAssignments = make(map[string][]byte)
56
	for i := 0; i < n; i++ {
57
		memberId, err := pd.getString()
58
		if err != nil {
59
			return err
60
		}
61
		memberAssignment, err := pd.getBytes()
62
		if err != nil {
63
			return err
64
		}
65

66
		r.GroupAssignments[memberId] = memberAssignment
67
	}
68

69
	return nil
70
}
71

72
func (r *SyncGroupRequest) key() int16 {
73
	return 14
74
}
75

76
func (r *SyncGroupRequest) version() int16 {
77
	return 0
78
}
79

80
func (r *SyncGroupRequest) headerVersion() int16 {
81
	return 1
82
}
83

84
func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
85
	return V0_9_0_0
86
}
87

88
func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) {
89
	if r.GroupAssignments == nil {
90
		r.GroupAssignments = make(map[string][]byte)
91
	}
92

93
	r.GroupAssignments[memberId] = memberAssignment
94
}
95

96
func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error {
97
	bin, err := encode(memberAssignment, nil)
98
	if err != nil {
99
		return err
100
	}
101

102
	r.AddGroupAssignment(memberId, bin)
103
	return nil
104
}
105

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.