Skip to content

Commit

Permalink
Merge pull request #2315 from k8scat/main
Browse files Browse the repository at this point in the history
feat(producer): expose ProducerMessage.byteSize() function
  • Loading branch information
dnwe authored Aug 11, 2022
2 parents 957781a + b9fc0d6 commit 820d5b2
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 5 deletions.
4 changes: 2 additions & 2 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ type ProducerMessage struct {

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.

func (m *ProducerMessage) byteSize(version int) int {
func (m *ProducerMessage) ByteSize(version int) int {
var size int
if version >= 2 {
size = maximumRecordOverhead
Expand Down Expand Up @@ -366,7 +366,7 @@ func (p *asyncProducer) dispatcher() {
p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
continue
}
if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
if msg.ByteSize(version) > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ErrMessageSizeTooLarge)
continue
}
Expand Down
4 changes: 2 additions & 2 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,11 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {

switch {
// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
case ps.bufferBytes+msg.ByteSize(version) >= int(MaxRequestSize-(10*1024)):
return true
// Would we overflow the size-limit of a message-batch for this partition?
case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.ByteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
return true
// Would we overflow simply in number of messages?
case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
Expand Down
2 changes: 1 addition & 1 deletion produce_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestProduceSetAddingMessagesOverflowBytesLimit(t *testing.T) {

msg := &ProducerMessage{Key: StringEncoder(TestMessage), Value: StringEncoder(TestMessage)}

for ps.bufferBytes+msg.byteSize(2) < parent.conf.Producer.MaxMessageBytes {
for ps.bufferBytes+msg.ByteSize(2) < parent.conf.Producer.MaxMessageBytes {
if ps.wouldOverflow(msg) {
t.Error("set shouldn't fill up before 1000 bytes")
}
Expand Down

0 comments on commit 820d5b2

Please sign in to comment.