cubefs

Форк
0
/
create_topics_request.go 
178 строк · 3.3 Кб
1
package sarama
2

3
import (
4
	"time"
5
)
6

7
type CreateTopicsRequest struct {
8
	Version int16
9

10
	TopicDetails map[string]*TopicDetail
11
	Timeout      time.Duration
12
	ValidateOnly bool
13
}
14

15
func (c *CreateTopicsRequest) encode(pe packetEncoder) error {
16
	if err := pe.putArrayLength(len(c.TopicDetails)); err != nil {
17
		return err
18
	}
19
	for topic, detail := range c.TopicDetails {
20
		if err := pe.putString(topic); err != nil {
21
			return err
22
		}
23
		if err := detail.encode(pe); err != nil {
24
			return err
25
		}
26
	}
27

28
	pe.putInt32(int32(c.Timeout / time.Millisecond))
29

30
	if c.Version >= 1 {
31
		pe.putBool(c.ValidateOnly)
32
	}
33

34
	return nil
35
}
36

37
func (c *CreateTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
38
	n, err := pd.getArrayLength()
39
	if err != nil {
40
		return err
41
	}
42

43
	c.TopicDetails = make(map[string]*TopicDetail, n)
44

45
	for i := 0; i < n; i++ {
46
		topic, err := pd.getString()
47
		if err != nil {
48
			return err
49
		}
50
		c.TopicDetails[topic] = new(TopicDetail)
51
		if err = c.TopicDetails[topic].decode(pd, version); err != nil {
52
			return err
53
		}
54
	}
55

56
	timeout, err := pd.getInt32()
57
	if err != nil {
58
		return err
59
	}
60
	c.Timeout = time.Duration(timeout) * time.Millisecond
61

62
	if version >= 1 {
63
		c.ValidateOnly, err = pd.getBool()
64
		if err != nil {
65
			return err
66
		}
67

68
		c.Version = version
69
	}
70

71
	return nil
72
}
73

74
func (c *CreateTopicsRequest) key() int16 {
75
	return 19
76
}
77

78
func (c *CreateTopicsRequest) version() int16 {
79
	return c.Version
80
}
81

82
func (r *CreateTopicsRequest) headerVersion() int16 {
83
	return 1
84
}
85

86
func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
87
	switch c.Version {
88
	case 2:
89
		return V1_0_0_0
90
	case 1:
91
		return V0_11_0_0
92
	default:
93
		return V0_10_1_0
94
	}
95
}
96

97
type TopicDetail struct {
98
	NumPartitions     int32
99
	ReplicationFactor int16
100
	ReplicaAssignment map[int32][]int32
101
	ConfigEntries     map[string]*string
102
}
103

104
func (t *TopicDetail) encode(pe packetEncoder) error {
105
	pe.putInt32(t.NumPartitions)
106
	pe.putInt16(t.ReplicationFactor)
107

108
	if err := pe.putArrayLength(len(t.ReplicaAssignment)); err != nil {
109
		return err
110
	}
111
	for partition, assignment := range t.ReplicaAssignment {
112
		pe.putInt32(partition)
113
		if err := pe.putInt32Array(assignment); err != nil {
114
			return err
115
		}
116
	}
117

118
	if err := pe.putArrayLength(len(t.ConfigEntries)); err != nil {
119
		return err
120
	}
121
	for configKey, configValue := range t.ConfigEntries {
122
		if err := pe.putString(configKey); err != nil {
123
			return err
124
		}
125
		if err := pe.putNullableString(configValue); err != nil {
126
			return err
127
		}
128
	}
129

130
	return nil
131
}
132

133
func (t *TopicDetail) decode(pd packetDecoder, version int16) (err error) {
134
	if t.NumPartitions, err = pd.getInt32(); err != nil {
135
		return err
136
	}
137
	if t.ReplicationFactor, err = pd.getInt16(); err != nil {
138
		return err
139
	}
140

141
	n, err := pd.getArrayLength()
142
	if err != nil {
143
		return err
144
	}
145

146
	if n > 0 {
147
		t.ReplicaAssignment = make(map[int32][]int32, n)
148
		for i := 0; i < n; i++ {
149
			replica, err := pd.getInt32()
150
			if err != nil {
151
				return err
152
			}
153
			if t.ReplicaAssignment[replica], err = pd.getInt32Array(); err != nil {
154
				return err
155
			}
156
		}
157
	}
158

159
	n, err = pd.getArrayLength()
160
	if err != nil {
161
		return err
162
	}
163

164
	if n > 0 {
165
		t.ConfigEntries = make(map[string]*string, n)
166
		for i := 0; i < n; i++ {
167
			configKey, err := pd.getString()
168
			if err != nil {
169
				return err
170
			}
171
			if t.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
172
				return err
173
			}
174
		}
175
	}
176

177
	return nil
178
}
179

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.