cubefs
273 строки · 8.4 Кб
1package sarama2
3import (4"encoding/binary"5"errors"6"time"7)
8
9type partitionSet struct {10msgs []*ProducerMessage11recordsToSend Records
12bufferBytes int13}
14
15type produceSet struct {16parent *asyncProducer17msgs map[string]map[int32]*partitionSet18producerID int6419producerEpoch int1620
21bufferBytes int22bufferCount int23}
24
25func newProduceSet(parent *asyncProducer) *produceSet {26pid, epoch := parent.txnmgr.getProducerID()27return &produceSet{28msgs: make(map[string]map[int32]*partitionSet),29parent: parent,30producerID: pid,31producerEpoch: epoch,32}33}
34
35func (ps *produceSet) add(msg *ProducerMessage) error {36var err error37var key, val []byte38
39if msg.Key != nil {40if key, err = msg.Key.Encode(); err != nil {41return err42}43}44
45if msg.Value != nil {46if val, err = msg.Value.Encode(); err != nil {47return err48}49}50
51timestamp := msg.Timestamp52if timestamp.IsZero() {53timestamp = time.Now()54}55timestamp = timestamp.Truncate(time.Millisecond)56
57partitions := ps.msgs[msg.Topic]58if partitions == nil {59partitions = make(map[int32]*partitionSet)60ps.msgs[msg.Topic] = partitions61}62
63var size int64
65set := partitions[msg.Partition]66if set == nil {67if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {68batch := &RecordBatch{69FirstTimestamp: timestamp,70Version: 2,71Codec: ps.parent.conf.Producer.Compression,72CompressionLevel: ps.parent.conf.Producer.CompressionLevel,73ProducerID: ps.producerID,74ProducerEpoch: ps.producerEpoch,75}76if ps.parent.conf.Producer.Idempotent {77batch.FirstSequence = msg.sequenceNumber78}79set = &partitionSet{recordsToSend: newDefaultRecords(batch)}80size = recordBatchOverhead81} else {82set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}83}84partitions[msg.Partition] = set85}86
87if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {88if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {89return errors.New("assertion failed: message out of sequence added to a batch")90}91}92
93// Past this point we can't return an error, because we've already added the message to the set.94set.msgs = append(set.msgs, msg)95
96if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {97// We are being conservative here to avoid having to prep encode the record98size += maximumRecordOverhead99rec := &Record{100Key: key,101Value: val,102TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),103}104size += len(key) + len(val)105if len(msg.Headers) > 0 {106rec.Headers = make([]*RecordHeader, len(msg.Headers))107for i := range msg.Headers {108rec.Headers[i] = &msg.Headers[i]109size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32110}111}112set.recordsToSend.RecordBatch.addRecord(rec)113} else {114msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}115if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {116msgToSend.Timestamp = timestamp117msgToSend.Version = 1118}119set.recordsToSend.MsgSet.addMessage(msgToSend)120size = producerMessageOverhead + len(key) + len(val)121}122
123set.bufferBytes += size124ps.bufferBytes += size125ps.bufferCount++126
127return nil128}
129
130func (ps *produceSet) buildRequest() *ProduceRequest {131req := &ProduceRequest{132RequiredAcks: ps.parent.conf.Producer.RequiredAcks,133Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond),134}135if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {136req.Version = 2137}138if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {139req.Version = 3140}141
142if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {143req.Version = 7144}145
146for topic, partitionSets := range ps.msgs {147for partition, set := range partitionSets {148if req.Version >= 3 {149// If the API version we're hitting is 3 or greater, we need to calculate150// offsets for each record in the batch relative to FirstOffset.151// Additionally, we must set LastOffsetDelta to the value of the last offset152// in the batch. Since the OffsetDelta of the first record is 0, we know that the153// final record of any batch will have an offset of (# of records in batch) - 1.154// (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets155// under the RecordBatch section for details.)156rb := set.recordsToSend.RecordBatch157if len(rb.Records) > 0 {158rb.LastOffsetDelta = int32(len(rb.Records) - 1)159for i, record := range rb.Records {160record.OffsetDelta = int64(i)161}162}163req.AddBatch(topic, partition, rb)164continue165}166if ps.parent.conf.Producer.Compression == CompressionNone {167req.AddSet(topic, partition, set.recordsToSend.MsgSet)168} else {169// When compression is enabled, the entire set for each partition is compressed170// and sent as the payload of a single fake "message" with the appropriate codec171// set and no key. When the server sees a message with a compression codec, it172// decompresses the payload and treats the result as its message set.173
174if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {175// If our version is 0.10 or later, assign relative offsets176// to the inner messages. This lets the broker avoid177// recompressing the message set.178// (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets179// for details on relative offsets.)180for i, msg := range set.recordsToSend.MsgSet.Messages {181msg.Offset = int64(i)182}183}184payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry)185if err != nil {186Logger.Println(err) // if this happens, it's basically our fault.187panic(err)188}189compMsg := &Message{190Codec: ps.parent.conf.Producer.Compression,191CompressionLevel: ps.parent.conf.Producer.CompressionLevel,192Key: nil,193Value: payload,194Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics195}196if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {197compMsg.Version = 1198compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp199}200req.AddMessage(topic, partition, compMsg)201}202}203}204
205return req206}
207
208func (ps *produceSet) eachPartition(cb func(topic string, partition int32, pSet *partitionSet)) {209for topic, partitionSet := range ps.msgs {210for partition, set := range partitionSet {211cb(topic, partition, set)212}213}214}
215
216func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage {217if ps.msgs[topic] == nil {218return nil219}220set := ps.msgs[topic][partition]221if set == nil {222return nil223}224ps.bufferBytes -= set.bufferBytes225ps.bufferCount -= len(set.msgs)226delete(ps.msgs[topic], partition)227return set.msgs228}
229
230func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {231version := 1232if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {233version = 2234}235
236switch {237// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.238case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):239return true240// Would we overflow the size-limit of a message-batch for this partition?241case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&242ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:243return true244// Would we overflow simply in number of messages?245case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:246return true247default:248return false249}250}
251
252func (ps *produceSet) readyToFlush() bool {253switch {254// If we don't have any messages, nothing else matters255case ps.empty():256return false257// If all three config values are 0, we always flush as-fast-as-possible258case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:259return true260// If we've passed the message trigger-point261case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:262return true263// If we've passed the byte trigger-point264case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:265return true266default:267return false268}269}
270
271func (ps *produceSet) empty() bool {272return ps.bufferCount == 0273}
274