From b9fc0d6d7579d75bd3a4cdab0e7a2baf03ec9c71 Mon Sep 17 00:00:00 2001 From: K8sCat Date: Thu, 11 Aug 2022 19:57:23 +0800 Subject: [PATCH] update: expose ProducerMessage.byteSize() function Signed-off-by: K8sCat --- async_producer.go | 4 ++-- produce_set.go | 4 ++-- produce_set_test.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/async_producer.go b/async_producer.go index 4f7ece588..07141e045 100644 --- a/async_producer.go +++ b/async_producer.go @@ -232,7 +232,7 @@ type ProducerMessage struct { const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. -func (m *ProducerMessage) byteSize(version int) int { +func (m *ProducerMessage) ByteSize(version int) int { var size int if version >= 2 { size = maximumRecordOverhead @@ -366,7 +366,7 @@ func (p *asyncProducer) dispatcher() { p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11")) continue } - if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes { + if msg.ByteSize(version) > p.conf.Producer.MaxMessageBytes { p.returnError(msg, ErrMessageSizeTooLarge) continue } diff --git a/produce_set.go b/produce_set.go index 9c70f8180..c3ba78f89 100644 --- a/produce_set.go +++ b/produce_set.go @@ -235,11 +235,11 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool { switch { // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety. - case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)): + case ps.bufferBytes+msg.ByteSize(version) >= int(MaxRequestSize-(10*1024)): return true // Would we overflow the size-limit of a message-batch for this partition? case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil && - ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes: + ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.ByteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes: return true // Would we overflow simply in number of messages? case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages: diff --git a/produce_set_test.go b/produce_set_test.go index 652d6d7d9..8f580c27d 100644 --- a/produce_set_test.go +++ b/produce_set_test.go @@ -73,7 +73,7 @@ func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) { msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)} - for ps.bufferBytes+msg.byteSize(2) < parent.conf.Producer.MaxMessageBytes { + for ps.bufferBytes+msg.ByteSize(2) < parent.conf.Producer.MaxMessageBytes { if ps.wouldOverflow(msg) { t.Error("set shouldn't fill up before 1000 bytes") }