cubefs

Форк
0
/
create_partitions_request.go 
125 строк · 2.3 Кб
1
package sarama
2

3
import "time"
4

5
type CreatePartitionsRequest struct {
6
	TopicPartitions map[string]*TopicPartition
7
	Timeout         time.Duration
8
	ValidateOnly    bool
9
}
10

11
func (c *CreatePartitionsRequest) encode(pe packetEncoder) error {
12
	if err := pe.putArrayLength(len(c.TopicPartitions)); err != nil {
13
		return err
14
	}
15

16
	for topic, partition := range c.TopicPartitions {
17
		if err := pe.putString(topic); err != nil {
18
			return err
19
		}
20
		if err := partition.encode(pe); err != nil {
21
			return err
22
		}
23
	}
24

25
	pe.putInt32(int32(c.Timeout / time.Millisecond))
26

27
	pe.putBool(c.ValidateOnly)
28

29
	return nil
30
}
31

32
func (c *CreatePartitionsRequest) decode(pd packetDecoder, version int16) (err error) {
33
	n, err := pd.getArrayLength()
34
	if err != nil {
35
		return err
36
	}
37
	c.TopicPartitions = make(map[string]*TopicPartition, n)
38
	for i := 0; i < n; i++ {
39
		topic, err := pd.getString()
40
		if err != nil {
41
			return err
42
		}
43
		c.TopicPartitions[topic] = new(TopicPartition)
44
		if err := c.TopicPartitions[topic].decode(pd, version); err != nil {
45
			return err
46
		}
47
	}
48

49
	timeout, err := pd.getInt32()
50
	if err != nil {
51
		return err
52
	}
53
	c.Timeout = time.Duration(timeout) * time.Millisecond
54

55
	if c.ValidateOnly, err = pd.getBool(); err != nil {
56
		return err
57
	}
58

59
	return nil
60
}
61

62
func (r *CreatePartitionsRequest) key() int16 {
63
	return 37
64
}
65

66
func (r *CreatePartitionsRequest) version() int16 {
67
	return 0
68
}
69

70
func (r *CreatePartitionsRequest) headerVersion() int16 {
71
	return 1
72
}
73

74
func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion {
75
	return V1_0_0_0
76
}
77

78
type TopicPartition struct {
79
	Count      int32
80
	Assignment [][]int32
81
}
82

83
func (t *TopicPartition) encode(pe packetEncoder) error {
84
	pe.putInt32(t.Count)
85

86
	if len(t.Assignment) == 0 {
87
		pe.putInt32(-1)
88
		return nil
89
	}
90

91
	if err := pe.putArrayLength(len(t.Assignment)); err != nil {
92
		return err
93
	}
94

95
	for _, assign := range t.Assignment {
96
		if err := pe.putInt32Array(assign); err != nil {
97
			return err
98
		}
99
	}
100

101
	return nil
102
}
103

104
func (t *TopicPartition) decode(pd packetDecoder, version int16) (err error) {
105
	if t.Count, err = pd.getInt32(); err != nil {
106
		return err
107
	}
108

109
	n, err := pd.getInt32()
110
	if err != nil {
111
		return err
112
	}
113
	if n <= 0 {
114
		return nil
115
	}
116
	t.Assignment = make([][]int32, n)
117

118
	for i := 0; i < int(n); i++ {
119
		if t.Assignment[i], err = pd.getInt32Array(); err != nil {
120
			return err
121
		}
122
	}
123

124
	return nil
125
}
126

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.