cubefs

Форк
0
/
join_group_request.go 
167 строк · 3.4 Кб
1
package sarama
2

3
type GroupProtocol struct {
4
	Name     string
5
	Metadata []byte
6
}
7

8
func (p *GroupProtocol) decode(pd packetDecoder) (err error) {
9
	p.Name, err = pd.getString()
10
	if err != nil {
11
		return err
12
	}
13
	p.Metadata, err = pd.getBytes()
14
	return err
15
}
16

17
func (p *GroupProtocol) encode(pe packetEncoder) (err error) {
18
	if err := pe.putString(p.Name); err != nil {
19
		return err
20
	}
21
	if err := pe.putBytes(p.Metadata); err != nil {
22
		return err
23
	}
24
	return nil
25
}
26

27
type JoinGroupRequest struct {
28
	Version               int16
29
	GroupId               string
30
	SessionTimeout        int32
31
	RebalanceTimeout      int32
32
	MemberId              string
33
	ProtocolType          string
34
	GroupProtocols        map[string][]byte // deprecated; use OrderedGroupProtocols
35
	OrderedGroupProtocols []*GroupProtocol
36
}
37

38
func (r *JoinGroupRequest) encode(pe packetEncoder) error {
39
	if err := pe.putString(r.GroupId); err != nil {
40
		return err
41
	}
42
	pe.putInt32(r.SessionTimeout)
43
	if r.Version >= 1 {
44
		pe.putInt32(r.RebalanceTimeout)
45
	}
46
	if err := pe.putString(r.MemberId); err != nil {
47
		return err
48
	}
49
	if err := pe.putString(r.ProtocolType); err != nil {
50
		return err
51
	}
52

53
	if len(r.GroupProtocols) > 0 {
54
		if len(r.OrderedGroupProtocols) > 0 {
55
			return PacketDecodingError{"cannot specify both GroupProtocols and OrderedGroupProtocols on JoinGroupRequest"}
56
		}
57

58
		if err := pe.putArrayLength(len(r.GroupProtocols)); err != nil {
59
			return err
60
		}
61
		for name, metadata := range r.GroupProtocols {
62
			if err := pe.putString(name); err != nil {
63
				return err
64
			}
65
			if err := pe.putBytes(metadata); err != nil {
66
				return err
67
			}
68
		}
69
	} else {
70
		if err := pe.putArrayLength(len(r.OrderedGroupProtocols)); err != nil {
71
			return err
72
		}
73
		for _, protocol := range r.OrderedGroupProtocols {
74
			if err := protocol.encode(pe); err != nil {
75
				return err
76
			}
77
		}
78
	}
79

80
	return nil
81
}
82

83
func (r *JoinGroupRequest) decode(pd packetDecoder, version int16) (err error) {
84
	r.Version = version
85

86
	if r.GroupId, err = pd.getString(); err != nil {
87
		return
88
	}
89

90
	if r.SessionTimeout, err = pd.getInt32(); err != nil {
91
		return
92
	}
93

94
	if version >= 1 {
95
		if r.RebalanceTimeout, err = pd.getInt32(); err != nil {
96
			return err
97
		}
98
	}
99

100
	if r.MemberId, err = pd.getString(); err != nil {
101
		return
102
	}
103

104
	if r.ProtocolType, err = pd.getString(); err != nil {
105
		return
106
	}
107

108
	n, err := pd.getArrayLength()
109
	if err != nil {
110
		return err
111
	}
112
	if n == 0 {
113
		return nil
114
	}
115

116
	r.GroupProtocols = make(map[string][]byte)
117
	for i := 0; i < n; i++ {
118
		protocol := &GroupProtocol{}
119
		if err := protocol.decode(pd); err != nil {
120
			return err
121
		}
122
		r.GroupProtocols[protocol.Name] = protocol.Metadata
123
		r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, protocol)
124
	}
125

126
	return nil
127
}
128

129
func (r *JoinGroupRequest) key() int16 {
130
	return 11
131
}
132

133
func (r *JoinGroupRequest) version() int16 {
134
	return r.Version
135
}
136

137
func (r *JoinGroupRequest) headerVersion() int16 {
138
	return 1
139
}
140

141
func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
142
	switch r.Version {
143
	case 2:
144
		return V0_11_0_0
145
	case 1:
146
		return V0_10_1_0
147
	default:
148
		return V0_9_0_0
149
	}
150
}
151

152
func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
153
	r.OrderedGroupProtocols = append(r.OrderedGroupProtocols, &GroupProtocol{
154
		Name:     name,
155
		Metadata: metadata,
156
	})
157
}
158

159
func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error {
160
	bin, err := encode(metadata, nil)
161
	if err != nil {
162
		return err
163
	}
164

165
	r.AddGroupProtocol(name, bin)
166
	return nil
167
}
168

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.