cubefs

Форк
0
203 строки · 4.0 Кб
1
package sarama
2

3
import (
4
	"encoding/binary"
5
	"fmt"
6
	"io"
7
)
8

9
type protocolBody interface {
10
	encoder
11
	versionedDecoder
12
	key() int16
13
	version() int16
14
	headerVersion() int16
15
	requiredVersion() KafkaVersion
16
}
17

18
type request struct {
19
	correlationID int32
20
	clientID      string
21
	body          protocolBody
22
}
23

24
func (r *request) encode(pe packetEncoder) error {
25
	pe.push(&lengthField{})
26
	pe.putInt16(r.body.key())
27
	pe.putInt16(r.body.version())
28
	pe.putInt32(r.correlationID)
29

30
	if r.body.headerVersion() >= 1 {
31
		err := pe.putString(r.clientID)
32
		if err != nil {
33
			return err
34
		}
35
	}
36

37
	if r.body.headerVersion() >= 2 {
38
		// we don't use tag headers at the moment so we just put an array length of 0
39
		pe.putUVarint(0)
40
	}
41

42
	err := r.body.encode(pe)
43
	if err != nil {
44
		return err
45
	}
46

47
	return pe.pop()
48
}
49

50
func (r *request) decode(pd packetDecoder) (err error) {
51
	key, err := pd.getInt16()
52
	if err != nil {
53
		return err
54
	}
55

56
	version, err := pd.getInt16()
57
	if err != nil {
58
		return err
59
	}
60

61
	r.correlationID, err = pd.getInt32()
62
	if err != nil {
63
		return err
64
	}
65

66
	r.clientID, err = pd.getString()
67
	if err != nil {
68
		return err
69
	}
70

71
	r.body = allocateBody(key, version)
72
	if r.body == nil {
73
		return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
74
	}
75

76
	if r.body.headerVersion() >= 2 {
77
		// tagged field
78
		_, err = pd.getUVarint()
79
		if err != nil {
80
			return err
81
		}
82
	}
83

84
	return r.body.decode(pd, version)
85
}
86

87
func decodeRequest(r io.Reader) (*request, int, error) {
88
	var (
89
		bytesRead   int
90
		lengthBytes = make([]byte, 4)
91
	)
92

93
	if _, err := io.ReadFull(r, lengthBytes); err != nil {
94
		return nil, bytesRead, err
95
	}
96

97
	bytesRead += len(lengthBytes)
98
	length := int32(binary.BigEndian.Uint32(lengthBytes))
99

100
	if length <= 4 || length > MaxRequestSize {
101
		return nil, bytesRead, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
102
	}
103

104
	encodedReq := make([]byte, length)
105
	if _, err := io.ReadFull(r, encodedReq); err != nil {
106
		return nil, bytesRead, err
107
	}
108

109
	bytesRead += len(encodedReq)
110

111
	req := &request{}
112
	if err := decode(encodedReq, req); err != nil {
113
		return nil, bytesRead, err
114
	}
115

116
	return req, bytesRead, nil
117
}
118

119
func allocateBody(key, version int16) protocolBody {
120
	switch key {
121
	case 0:
122
		return &ProduceRequest{}
123
	case 1:
124
		return &FetchRequest{Version: version}
125
	case 2:
126
		return &OffsetRequest{Version: version}
127
	case 3:
128
		return &MetadataRequest{}
129
	case 8:
130
		return &OffsetCommitRequest{Version: version}
131
	case 9:
132
		return &OffsetFetchRequest{Version: version}
133
	case 10:
134
		return &FindCoordinatorRequest{}
135
	case 11:
136
		return &JoinGroupRequest{}
137
	case 12:
138
		return &HeartbeatRequest{}
139
	case 13:
140
		return &LeaveGroupRequest{}
141
	case 14:
142
		return &SyncGroupRequest{}
143
	case 15:
144
		return &DescribeGroupsRequest{}
145
	case 16:
146
		return &ListGroupsRequest{}
147
	case 17:
148
		return &SaslHandshakeRequest{}
149
	case 18:
150
		return &ApiVersionsRequest{Version: version}
151
	case 19:
152
		return &CreateTopicsRequest{}
153
	case 20:
154
		return &DeleteTopicsRequest{}
155
	case 21:
156
		return &DeleteRecordsRequest{}
157
	case 22:
158
		return &InitProducerIDRequest{}
159
	case 24:
160
		return &AddPartitionsToTxnRequest{}
161
	case 25:
162
		return &AddOffsetsToTxnRequest{}
163
	case 26:
164
		return &EndTxnRequest{}
165
	case 28:
166
		return &TxnOffsetCommitRequest{}
167
	case 29:
168
		return &DescribeAclsRequest{}
169
	case 30:
170
		return &CreateAclsRequest{}
171
	case 31:
172
		return &DeleteAclsRequest{}
173
	case 32:
174
		return &DescribeConfigsRequest{}
175
	case 33:
176
		return &AlterConfigsRequest{}
177
	case 35:
178
		return &DescribeLogDirsRequest{}
179
	case 36:
180
		return &SaslAuthenticateRequest{}
181
	case 37:
182
		return &CreatePartitionsRequest{}
183
	case 42:
184
		return &DeleteGroupsRequest{}
185
	case 44:
186
		return &IncrementalAlterConfigsRequest{}
187
	case 45:
188
		return &AlterPartitionReassignmentsRequest{}
189
	case 46:
190
		return &ListPartitionReassignmentsRequest{}
191
	case 47:
192
		return &DeleteOffsetsRequest{}
193
	case 48:
194
		return &DescribeClientQuotasRequest{}
195
	case 49:
196
		return &AlterClientQuotasRequest{}
197
	case 50:
198
		return &DescribeUserScramCredentialsRequest{}
199
	case 51:
200
		return &AlterUserScramCredentialsRequest{}
201
	}
202
	return nil
203
}
204

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.