cubefs

Форк
0
/
offset_request.go 
179 строк · 3.4 Кб
1
package sarama
2

3
type offsetRequestBlock struct {
4
	time       int64
5
	maxOffsets int32 // Only used in version 0
6
}
7

8
func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
9
	pe.putInt64(b.time)
10
	if version == 0 {
11
		pe.putInt32(b.maxOffsets)
12
	}
13

14
	return nil
15
}
16

17
func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
18
	if b.time, err = pd.getInt64(); err != nil {
19
		return err
20
	}
21
	if version == 0 {
22
		if b.maxOffsets, err = pd.getInt32(); err != nil {
23
			return err
24
		}
25
	}
26
	return nil
27
}
28

29
type OffsetRequest struct {
30
	Version        int16
31
	IsolationLevel IsolationLevel
32
	replicaID      int32
33
	isReplicaIDSet bool
34
	blocks         map[string]map[int32]*offsetRequestBlock
35
}
36

37
func (r *OffsetRequest) encode(pe packetEncoder) error {
38
	if r.isReplicaIDSet {
39
		pe.putInt32(r.replicaID)
40
	} else {
41
		// default replica ID is always -1 for clients
42
		pe.putInt32(-1)
43
	}
44

45
	if r.Version >= 2 {
46
		pe.putBool(r.IsolationLevel == ReadCommitted)
47
	}
48

49
	err := pe.putArrayLength(len(r.blocks))
50
	if err != nil {
51
		return err
52
	}
53
	for topic, partitions := range r.blocks {
54
		err = pe.putString(topic)
55
		if err != nil {
56
			return err
57
		}
58
		err = pe.putArrayLength(len(partitions))
59
		if err != nil {
60
			return err
61
		}
62
		for partition, block := range partitions {
63
			pe.putInt32(partition)
64
			if err = block.encode(pe, r.Version); err != nil {
65
				return err
66
			}
67
		}
68
	}
69
	return nil
70
}
71

72
func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
73
	r.Version = version
74

75
	replicaID, err := pd.getInt32()
76
	if err != nil {
77
		return err
78
	}
79
	if replicaID >= 0 {
80
		r.SetReplicaID(replicaID)
81
	}
82

83
	if r.Version >= 2 {
84
		tmp, err := pd.getBool()
85
		if err != nil {
86
			return err
87
		}
88

89
		r.IsolationLevel = ReadUncommitted
90
		if tmp {
91
			r.IsolationLevel = ReadCommitted
92
		}
93
	}
94

95
	blockCount, err := pd.getArrayLength()
96
	if err != nil {
97
		return err
98
	}
99
	if blockCount == 0 {
100
		return nil
101
	}
102
	r.blocks = make(map[string]map[int32]*offsetRequestBlock)
103
	for i := 0; i < blockCount; i++ {
104
		topic, err := pd.getString()
105
		if err != nil {
106
			return err
107
		}
108
		partitionCount, err := pd.getArrayLength()
109
		if err != nil {
110
			return err
111
		}
112
		r.blocks[topic] = make(map[int32]*offsetRequestBlock)
113
		for j := 0; j < partitionCount; j++ {
114
			partition, err := pd.getInt32()
115
			if err != nil {
116
				return err
117
			}
118
			block := &offsetRequestBlock{}
119
			if err := block.decode(pd, version); err != nil {
120
				return err
121
			}
122
			r.blocks[topic][partition] = block
123
		}
124
	}
125
	return nil
126
}
127

128
func (r *OffsetRequest) key() int16 {
129
	return 2
130
}
131

132
func (r *OffsetRequest) version() int16 {
133
	return r.Version
134
}
135

136
func (r *OffsetRequest) headerVersion() int16 {
137
	return 1
138
}
139

140
func (r *OffsetRequest) requiredVersion() KafkaVersion {
141
	switch r.Version {
142
	case 1:
143
		return V0_10_1_0
144
	case 2:
145
		return V0_11_0_0
146
	default:
147
		return MinVersion
148
	}
149
}
150

151
func (r *OffsetRequest) SetReplicaID(id int32) {
152
	r.replicaID = id
153
	r.isReplicaIDSet = true
154
}
155

156
func (r *OffsetRequest) ReplicaID() int32 {
157
	if r.isReplicaIDSet {
158
		return r.replicaID
159
	}
160
	return -1
161
}
162

163
func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
164
	if r.blocks == nil {
165
		r.blocks = make(map[string]map[int32]*offsetRequestBlock)
166
	}
167

168
	if r.blocks[topic] == nil {
169
		r.blocks[topic] = make(map[int32]*offsetRequestBlock)
170
	}
171

172
	tmp := new(offsetRequestBlock)
173
	tmp.time = time
174
	if r.Version == 0 {
175
		tmp.maxOffsets = maxOffsets
176
	}
177

178
	r.blocks[topic][partitionID] = tmp
179
}
180

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.