cubefs

Форк
0
/
delete_offsets_response.go 
112 строк · 2.3 Кб
1
package sarama
2

3
import (
4
	"time"
5
)
6

7
type DeleteOffsetsResponse struct {
8
	//The top-level error code, or 0 if there was no error.
9
	ErrorCode    KError
10
	ThrottleTime time.Duration
11
	//The responses for each partition of the topics.
12
	Errors map[string]map[int32]KError
13
}
14

15
func (r *DeleteOffsetsResponse) AddError(topic string, partition int32, errorCode KError) {
16
	if r.Errors == nil {
17
		r.Errors = make(map[string]map[int32]KError)
18
	}
19
	partitions := r.Errors[topic]
20
	if partitions == nil {
21
		partitions = make(map[int32]KError)
22
		r.Errors[topic] = partitions
23
	}
24
	partitions[partition] = errorCode
25
}
26

27
func (r *DeleteOffsetsResponse) encode(pe packetEncoder) error {
28
	pe.putInt16(int16(r.ErrorCode))
29
	pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
30

31
	if err := pe.putArrayLength(len(r.Errors)); err != nil {
32
		return err
33
	}
34
	for topic, partitions := range r.Errors {
35
		if err := pe.putString(topic); err != nil {
36
			return err
37
		}
38
		if err := pe.putArrayLength(len(partitions)); err != nil {
39
			return err
40
		}
41
		for partition, errorCode := range partitions {
42
			pe.putInt32(partition)
43
			pe.putInt16(int16(errorCode))
44
		}
45
	}
46
	return nil
47
}
48

49
func (r *DeleteOffsetsResponse) decode(pd packetDecoder, version int16) error {
50
	tmpErr, err := pd.getInt16()
51
	if err != nil {
52
		return err
53
	}
54
	r.ErrorCode = KError(tmpErr)
55

56
	throttleTime, err := pd.getInt32()
57
	if err != nil {
58
		return err
59
	}
60
	r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
61

62
	numTopics, err := pd.getArrayLength()
63
	if err != nil || numTopics == 0 {
64
		return err
65
	}
66

67
	r.Errors = make(map[string]map[int32]KError, numTopics)
68
	for i := 0; i < numTopics; i++ {
69
		name, err := pd.getString()
70
		if err != nil {
71
			return err
72
		}
73

74
		numErrors, err := pd.getArrayLength()
75
		if err != nil {
76
			return err
77
		}
78

79
		r.Errors[name] = make(map[int32]KError, numErrors)
80

81
		for j := 0; j < numErrors; j++ {
82
			id, err := pd.getInt32()
83
			if err != nil {
84
				return err
85
			}
86

87
			tmp, err := pd.getInt16()
88
			if err != nil {
89
				return err
90
			}
91
			r.Errors[name][id] = KError(tmp)
92
		}
93
	}
94

95
	return nil
96
}
97

98
func (r *DeleteOffsetsResponse) key() int16 {
99
	return 47
100
}
101

102
func (r *DeleteOffsetsResponse) version() int16 {
103
	return 0
104
}
105

106
func (r *DeleteOffsetsResponse) headerVersion() int16 {
107
	return 0
108
}
109

110
func (r *DeleteOffsetsResponse) requiredVersion() KafkaVersion {
111
	return V2_4_0_0
112
}
113

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.