From 782ba1442937080e6916178eaa1671092a94cde0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81ngel=20Ortu=C3=B1o?= Date: Wed, 9 Oct 2024 12:08:46 +0200 Subject: [PATCH] kgo: do not reuse decompression buffer for uncompressed batches (#8) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ref: https://github.com/twmb/franz-go/issues/823#issuecomment-2400842614 Signed-off-by: Miguel Ángel Ortuño --- pkg/kgo/compression.go | 3 +++ pkg/kgo/record_and_fetch.go | 6 +++++- pkg/kgo/source.go | 6 ++++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go index 2e75346d..f26ae185 100644 --- a/pkg/kgo/compression.go +++ b/pkg/kgo/compression.go @@ -359,6 +359,9 @@ func (d *decompressor) getDecodedBuffer(src []byte, compCodec codecType, pool *p } func (d *decompressor) copyDecodedBuffer(decoded []byte, compCodec codecType, pool *pool.BucketedPool[byte]) []byte { + if pool == nil { + return append([]byte(nil), decoded...) + } if compCodec == codecSnappy { // We already know the actual size of the decoded buffer before decompression, // so there's no need to copy the buffer. diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index dbfb9451..c82d7539 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -178,9 +178,13 @@ type Record struct { // Once this method has been called, any reference to the passed record should be considered invalid by the caller, // as it may be reused as a result of future calls to the PollFetches/PollRecords method. func (r *Record) Reuse() { - if r.recordsPool != nil { + if r.rcRawRecordsBuffer != nil { r.rcRawRecordsBuffer.release() + } + if r.rcBatchBuffer != nil { r.rcBatchBuffer.release() + } + if r.recordsPool != nil { r.recordsPool.put(r) } } diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 0f2d8962..51f87374 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -1583,7 +1583,9 @@ func processRecordBatch( } rawRecords := batch.Records - if compression := byte(batch.Attributes & 0x0007); compression != 0 { + + compression := byte(batch.Attributes & 0x0007) + if compression != 0 { var err error if rawRecords, err = decompressor.decompress(rawRecords, compression, o.DecompressBufferPool); err != nil { return 0, 0 // truncated batch @@ -1616,7 +1618,7 @@ func processRecordBatch( rcBatchBuff *rcBuffer[byte] rcRawRecordsBuff *rcBuffer[kmsg.Record] ) - if o.recordPool != nil { + if o.recordPool != nil && codecType(compression) != codecNone { rcBatchBuff = newRCBuffer(rawRecords, o.DecompressBufferPool) rcRawRecordsBuff = newRCBuffer(krecords, rawRecordsPool) }