cubefs
112 строк · 2.4 Кб
1package sarama2
3import "errors"4
5type MessageBlock struct {6Offset int647Msg *Message8}
9
10// Messages convenience helper which returns either all the
11// messages that are wrapped in this block
12func (msb *MessageBlock) Messages() []*MessageBlock {13if msb.Msg.Set != nil {14return msb.Msg.Set.Messages15}16return []*MessageBlock{msb}17}
18
19func (msb *MessageBlock) encode(pe packetEncoder) error {20pe.putInt64(msb.Offset)21pe.push(&lengthField{})22err := msb.Msg.encode(pe)23if err != nil {24return err25}26return pe.pop()27}
28
29func (msb *MessageBlock) decode(pd packetDecoder) (err error) {30if msb.Offset, err = pd.getInt64(); err != nil {31return err32}33
34lengthDecoder := acquireLengthField()35defer releaseLengthField(lengthDecoder)36
37if err = pd.push(lengthDecoder); err != nil {38return err39}40
41msb.Msg = new(Message)42if err = msb.Msg.decode(pd); err != nil {43return err44}45
46if err = pd.pop(); err != nil {47return err48}49
50return nil51}
52
53type MessageSet struct {54PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock55OverflowMessage bool // whether the set on the wire contained an overflow message56Messages []*MessageBlock57}
58
59func (ms *MessageSet) encode(pe packetEncoder) error {60for i := range ms.Messages {61err := ms.Messages[i].encode(pe)62if err != nil {63return err64}65}66return nil67}
68
69func (ms *MessageSet) decode(pd packetDecoder) (err error) {70ms.Messages = nil71
72for pd.remaining() > 0 {73magic, err := magicValue(pd)74if err != nil {75if errors.Is(err, ErrInsufficientData) {76ms.PartialTrailingMessage = true77return nil78}79return err80}81
82if magic > 1 {83return nil84}85
86msb := new(MessageBlock)87err = msb.decode(pd)88if err == nil {89ms.Messages = append(ms.Messages, msb)90} else if errors.Is(err, ErrInsufficientData) {91// As an optimization the server is allowed to return a partial message at the92// end of the message set. Clients should handle this case. So we just ignore such things.93if msb.Offset == -1 {94// This is an overflow message caused by chunked down conversion95ms.OverflowMessage = true96} else {97ms.PartialTrailingMessage = true98}99return nil100} else {101return err102}103}104
105return nil106}
107
108func (ms *MessageSet) addMessage(msg *Message) {109block := new(MessageBlock)110block.Msg = msg111ms.Messages = append(ms.Messages, block)112}
113