cubefs
131 строка · 2.3 Кб
1package sarama
2
3import (
4"fmt"
5"time"
6)
7
8type CreateTopicsResponse struct {
9Version int16
10ThrottleTime time.Duration
11TopicErrors map[string]*TopicError
12}
13
14func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
15if c.Version >= 2 {
16pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
17}
18
19if err := pe.putArrayLength(len(c.TopicErrors)); err != nil {
20return err
21}
22for topic, topicError := range c.TopicErrors {
23if err := pe.putString(topic); err != nil {
24return err
25}
26if err := topicError.encode(pe, c.Version); err != nil {
27return err
28}
29}
30
31return nil
32}
33
34func (c *CreateTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
35c.Version = version
36
37if version >= 2 {
38throttleTime, err := pd.getInt32()
39if err != nil {
40return err
41}
42c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
43}
44
45n, err := pd.getArrayLength()
46if err != nil {
47return err
48}
49
50c.TopicErrors = make(map[string]*TopicError, n)
51for i := 0; i < n; i++ {
52topic, err := pd.getString()
53if err != nil {
54return err
55}
56c.TopicErrors[topic] = new(TopicError)
57if err := c.TopicErrors[topic].decode(pd, version); err != nil {
58return err
59}
60}
61
62return nil
63}
64
65func (c *CreateTopicsResponse) key() int16 {
66return 19
67}
68
69func (c *CreateTopicsResponse) version() int16 {
70return c.Version
71}
72
73func (c *CreateTopicsResponse) headerVersion() int16 {
74return 0
75}
76
77func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
78switch c.Version {
79case 2:
80return V1_0_0_0
81case 1:
82return V0_11_0_0
83default:
84return V0_10_1_0
85}
86}
87
88type TopicError struct {
89Err KError
90ErrMsg *string
91}
92
93func (t *TopicError) Error() string {
94text := t.Err.Error()
95if t.ErrMsg != nil {
96text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
97}
98return text
99}
100
101func (t *TopicError) Unwrap() error {
102return t.Err
103}
104
105func (t *TopicError) encode(pe packetEncoder, version int16) error {
106pe.putInt16(int16(t.Err))
107
108if version >= 1 {
109if err := pe.putNullableString(t.ErrMsg); err != nil {
110return err
111}
112}
113
114return nil
115}
116
117func (t *TopicError) decode(pd packetDecoder, version int16) (err error) {
118kErr, err := pd.getInt16()
119if err != nil {
120return err
121}
122t.Err = KError(kErr)
123
124if version >= 1 {
125if t.ErrMsg, err = pd.getNullableString(); err != nil {
126return err
127}
128}
129
130return nil
131}
132