cubefs

Форк
0
112 строк · 2.4 Кб
1
package sarama
2

3
import "errors"
4

5
type MessageBlock struct {
6
	Offset int64
7
	Msg    *Message
8
}
9

10
// Messages convenience helper which returns either all the
11
// messages that are wrapped in this block
12
func (msb *MessageBlock) Messages() []*MessageBlock {
13
	if msb.Msg.Set != nil {
14
		return msb.Msg.Set.Messages
15
	}
16
	return []*MessageBlock{msb}
17
}
18

19
func (msb *MessageBlock) encode(pe packetEncoder) error {
20
	pe.putInt64(msb.Offset)
21
	pe.push(&lengthField{})
22
	err := msb.Msg.encode(pe)
23
	if err != nil {
24
		return err
25
	}
26
	return pe.pop()
27
}
28

29
func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
30
	if msb.Offset, err = pd.getInt64(); err != nil {
31
		return err
32
	}
33

34
	lengthDecoder := acquireLengthField()
35
	defer releaseLengthField(lengthDecoder)
36

37
	if err = pd.push(lengthDecoder); err != nil {
38
		return err
39
	}
40

41
	msb.Msg = new(Message)
42
	if err = msb.Msg.decode(pd); err != nil {
43
		return err
44
	}
45

46
	if err = pd.pop(); err != nil {
47
		return err
48
	}
49

50
	return nil
51
}
52

53
type MessageSet struct {
54
	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
55
	OverflowMessage        bool // whether the set on the wire contained an overflow message
56
	Messages               []*MessageBlock
57
}
58

59
func (ms *MessageSet) encode(pe packetEncoder) error {
60
	for i := range ms.Messages {
61
		err := ms.Messages[i].encode(pe)
62
		if err != nil {
63
			return err
64
		}
65
	}
66
	return nil
67
}
68

69
func (ms *MessageSet) decode(pd packetDecoder) (err error) {
70
	ms.Messages = nil
71

72
	for pd.remaining() > 0 {
73
		magic, err := magicValue(pd)
74
		if err != nil {
75
			if errors.Is(err, ErrInsufficientData) {
76
				ms.PartialTrailingMessage = true
77
				return nil
78
			}
79
			return err
80
		}
81

82
		if magic > 1 {
83
			return nil
84
		}
85

86
		msb := new(MessageBlock)
87
		err = msb.decode(pd)
88
		if err == nil {
89
			ms.Messages = append(ms.Messages, msb)
90
		} else if errors.Is(err, ErrInsufficientData) {
91
			// As an optimization the server is allowed to return a partial message at the
92
			// end of the message set. Clients should handle this case. So we just ignore such things.
93
			if msb.Offset == -1 {
94
				// This is an overflow message caused by chunked down conversion
95
				ms.OverflowMessage = true
96
			} else {
97
				ms.PartialTrailingMessage = true
98
			}
99
			return nil
100
		} else {
101
			return err
102
		}
103
	}
104

105
	return nil
106
}
107

108
func (ms *MessageSet) addMessage(msg *Message) {
109
	block := new(MessageBlock)
110
	block.Msg = msg
111
	ms.Messages = append(ms.Messages, block)
112
}
113

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.