cubefs

Форк
0
/
consumer_metadata_response.go 
82 строки · 1.8 Кб
1
package sarama
2

3
import (
4
	"net"
5
	"strconv"
6
)
7

8
// ConsumerMetadataResponse holds the response for a consumer group meta data requests
9
type ConsumerMetadataResponse struct {
10
	Err             KError
11
	Coordinator     *Broker
12
	CoordinatorID   int32  // deprecated: use Coordinator.ID()
13
	CoordinatorHost string // deprecated: use Coordinator.Addr()
14
	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
15
}
16

17
func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err error) {
18
	tmp := new(FindCoordinatorResponse)
19

20
	if err := tmp.decode(pd, version); err != nil {
21
		return err
22
	}
23

24
	r.Err = tmp.Err
25

26
	r.Coordinator = tmp.Coordinator
27
	if tmp.Coordinator == nil {
28
		return nil
29
	}
30

31
	// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
32
	// backwards compatibility
33
	host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
34
	if err != nil {
35
		return err
36
	}
37
	port, err := strconv.ParseInt(portstr, 10, 32)
38
	if err != nil {
39
		return err
40
	}
41
	r.CoordinatorID = r.Coordinator.ID()
42
	r.CoordinatorHost = host
43
	r.CoordinatorPort = int32(port)
44

45
	return nil
46
}
47

48
func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
49
	if r.Coordinator == nil {
50
		r.Coordinator = new(Broker)
51
		r.Coordinator.id = r.CoordinatorID
52
		r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort)))
53
	}
54

55
	tmp := &FindCoordinatorResponse{
56
		Version:     0,
57
		Err:         r.Err,
58
		Coordinator: r.Coordinator,
59
	}
60

61
	if err := tmp.encode(pe); err != nil {
62
		return err
63
	}
64

65
	return nil
66
}
67

68
func (r *ConsumerMetadataResponse) key() int16 {
69
	return 10
70
}
71

72
func (r *ConsumerMetadataResponse) version() int16 {
73
	return 0
74
}
75

76
func (r *ConsumerMetadataResponse) headerVersion() int16 {
77
	return 0
78
}
79

80
func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
81
	return V0_8_2_0
82
}
83

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.