cubefs

Форк
0
/
create_topics_response.go 
131 строка · 2.3 Кб
1
package sarama
2

3
import (
4
	"fmt"
5
	"time"
6
)
7

8
type CreateTopicsResponse struct {
9
	Version      int16
10
	ThrottleTime time.Duration
11
	TopicErrors  map[string]*TopicError
12
}
13

14
func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
15
	if c.Version >= 2 {
16
		pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
17
	}
18

19
	if err := pe.putArrayLength(len(c.TopicErrors)); err != nil {
20
		return err
21
	}
22
	for topic, topicError := range c.TopicErrors {
23
		if err := pe.putString(topic); err != nil {
24
			return err
25
		}
26
		if err := topicError.encode(pe, c.Version); err != nil {
27
			return err
28
		}
29
	}
30

31
	return nil
32
}
33

34
func (c *CreateTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
35
	c.Version = version
36

37
	if version >= 2 {
38
		throttleTime, err := pd.getInt32()
39
		if err != nil {
40
			return err
41
		}
42
		c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
43
	}
44

45
	n, err := pd.getArrayLength()
46
	if err != nil {
47
		return err
48
	}
49

50
	c.TopicErrors = make(map[string]*TopicError, n)
51
	for i := 0; i < n; i++ {
52
		topic, err := pd.getString()
53
		if err != nil {
54
			return err
55
		}
56
		c.TopicErrors[topic] = new(TopicError)
57
		if err := c.TopicErrors[topic].decode(pd, version); err != nil {
58
			return err
59
		}
60
	}
61

62
	return nil
63
}
64

65
func (c *CreateTopicsResponse) key() int16 {
66
	return 19
67
}
68

69
func (c *CreateTopicsResponse) version() int16 {
70
	return c.Version
71
}
72

73
func (c *CreateTopicsResponse) headerVersion() int16 {
74
	return 0
75
}
76

77
func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
78
	switch c.Version {
79
	case 2:
80
		return V1_0_0_0
81
	case 1:
82
		return V0_11_0_0
83
	default:
84
		return V0_10_1_0
85
	}
86
}
87

88
type TopicError struct {
89
	Err    KError
90
	ErrMsg *string
91
}
92

93
func (t *TopicError) Error() string {
94
	text := t.Err.Error()
95
	if t.ErrMsg != nil {
96
		text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
97
	}
98
	return text
99
}
100

101
func (t *TopicError) Unwrap() error {
102
	return t.Err
103
}
104

105
func (t *TopicError) encode(pe packetEncoder, version int16) error {
106
	pe.putInt16(int16(t.Err))
107

108
	if version >= 1 {
109
		if err := pe.putNullableString(t.ErrMsg); err != nil {
110
			return err
111
		}
112
	}
113

114
	return nil
115
}
116

117
func (t *TopicError) decode(pd packetDecoder, version int16) (err error) {
118
	kErr, err := pd.getInt16()
119
	if err != nil {
120
		return err
121
	}
122
	t.Err = KError(kErr)
123

124
	if version >= 1 {
125
		if t.ErrMsg, err = pd.getNullableString(); err != nil {
126
			return err
127
		}
128
	}
129

130
	return nil
131
}
132

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.