From 245ed65cd26a46b69bbc3c421ca28674d441d9fb Mon Sep 17 00:00:00 2001
From: gotjosh <josue.abreu@gmail.com>
Date: Tue, 8 Oct 2024 20:00:41 +0100
Subject: [PATCH] Revert #9511 (#9566) (#9568)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This reverts #9511 as it panics ingesters.

The details are:

```
│ panic: runtime error: invalid memory address or nil pointer dereference                                                                                                                      │
│     panic: runtime error: invalid memory address or nil pointer dereference                                                                                                                  │
│ [signal SIGSEGV: segmentation violation code=0x1 addr=0x30 pc=0x13b2350]                                                                                                                     │

     $GOROOT/src/runtime/panic.go:785 +0x124                                                                                                                                                  │
│ github.com/twmb/franz-go/pkg/kgo/internal/pool.(*BucketedPool[...]).Get(0x0?, 0x0?)
```

Signed-off-by: gotjosh <josue.abreu@gmail.com>
(cherry picked from commit ab3e627b1da134f33c06dd09ee4402a2eaad68f1)
---
 go.mod                                        |   7 +-
 go.sum                                        |   4 +-
 .../twmb/franz-go/pkg/kgo/compression.go      |  57 +------
 .../twmb/franz-go/pkg/kgo/config.go           |  22 ---
 .../pkg/kgo/internal/pool/bucketed_pool.go    |  94 ------------
 .../twmb/franz-go/pkg/kgo/record_and_fetch.go |  34 -----
 .../twmb/franz-go/pkg/kgo/source.go           | 140 ++++--------------
 vendor/modules.txt                            |   5 +-
 8 files changed, 41 insertions(+), 322 deletions(-)
 delete mode 100644 vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go

diff --git a/go.mod b/go.mod
index be37a419c9b..21baee46cdb 100644
--- a/go.mod
+++ b/go.mod
@@ -315,11 +315,8 @@ replace github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc
 // Replacing prometheus/alertmanager with our fork.
 replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6
 
-// Replacing with a fork commit based on v1.17.1 having cherry-picked the following PRs:
-// - https://github.com/grafana/franz-go/pull/1
-// - https://github.com/grafana/franz-go/pull/3
-// - https://github.com/grafana/franz-go/pull/4
-replace github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4
+// Replacing with a fork commit based on v1.17.1 with https://github.com/twmb/franz-go/pull/803 cherry-picked.
+replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9
 
 // Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions.
 // Following https://github.com/grafana/dskit/pull/581
diff --git a/go.sum b/go.sum
index 1e1864da1a2..295a086a6dd 100644
--- a/go.sum
+++ b/go.sum
@@ -947,6 +947,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 github.com/digitalocean/godo v1.121.0 h1:ilXiHuEnhbJs2fmFEPX0r/QQ6KfiOIMAhJN3f8NiCfI=
 github.com/digitalocean/godo v1.121.0/go.mod h1:WQVH83OHUy6gC4gXpEVQKtxTd4L5oCp+5OialidkPLY=
+github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 h1:jszPVGeTr25QTJ/jWiT7eXnabc4R4itChxUVFSCLjRQ=
+github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
 github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0=
 github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
 github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI=
@@ -1254,8 +1256,6 @@ github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319 h1:KACpOOTqA4WqyyKF2
 github.com/grafana/dskit v0.0.0-20240925193654-7c41a4057319/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
-github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4 h1:7/CJa4ilczGHLjULGJFxRFAGsnaN33YIJEqpm45TUYs=
-github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
 github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce h1:WI1olbgS+sEl77qxEYbmt9TgRUz7iLqmjh8lYPpGlKQ=
 github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce/go.mod h1:GFAN9Jn9t1cX7sNfc6ZoFyc4f7i8jtm3SajrWdZM2EE=
 github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wpvYcKfBcc5T4QnhdQjUhtUtB/1CY89lE=
diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go
index 1adbe69a074..81d9d8a7e3b 100644
--- a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go
+++ b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go
@@ -12,8 +12,6 @@ import (
 	"github.com/klauspost/compress/s2"
 	"github.com/klauspost/compress/zstd"
 	"github.com/pierrec/lz4/v4"
-
-	"github.com/twmb/franz-go/pkg/kgo/internal/pool"
 )
 
 var byteBuffers = sync.Pool{New: func() any { return bytes.NewBuffer(make([]byte, 8<<10)) }}
@@ -268,23 +266,15 @@ type zstdDecoder struct {
 	inner *zstd.Decoder
 }
 
-func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPool[byte]) ([]byte, error) {
+func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) {
 	// Early return in case there is no compression
 	compCodec := codecType(codec)
 	if compCodec == codecNone {
 		return src, nil
 	}
-
-	out, buf, err := d.getDecodedBuffer(src, compCodec, pool)
-	if err != nil {
-		return nil, err
-	}
-	defer func() {
-		if compCodec == codecSnappy {
-			return
-		}
-		pool.Put(buf)
-	}()
+	out := byteBuffers.Get().(*bytes.Buffer)
+	out.Reset()
+	defer byteBuffers.Put(out)
 
 	switch compCodec {
 	case codecGzip:
@@ -296,7 +286,7 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo
 		if _, err := io.Copy(out, ungz); err != nil {
 			return nil, err
 		}
-		return d.copyDecodedBuffer(out.Bytes(), compCodec, pool), nil
+		return append([]byte(nil), out.Bytes()...), nil
 	case codecSnappy:
 		if len(src) > 16 && bytes.HasPrefix(src, xerialPfx) {
 			return xerialDecode(src)
@@ -305,7 +295,7 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo
 		if err != nil {
 			return nil, err
 		}
-		return d.copyDecodedBuffer(decoded, compCodec, pool), nil
+		return append([]byte(nil), decoded...), nil
 	case codecLZ4:
 		unlz4 := d.unlz4Pool.Get().(*lz4.Reader)
 		defer d.unlz4Pool.Put(unlz4)
@@ -313,7 +303,7 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo
 		if _, err := io.Copy(out, unlz4); err != nil {
 			return nil, err
 		}
-		return d.copyDecodedBuffer(out.Bytes(), compCodec, pool), nil
+		return append([]byte(nil), out.Bytes()...), nil
 	case codecZstd:
 		unzstd := d.unzstdPool.Get().(*zstdDecoder)
 		defer d.unzstdPool.Put(unzstd)
@@ -321,43 +311,12 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo
 		if err != nil {
 			return nil, err
 		}
-		return d.copyDecodedBuffer(decoded, compCodec, pool), nil
+		return append([]byte(nil), decoded...), nil
 	default:
 		return nil, errors.New("unknown compression codec")
 	}
 }
 
-func (d *decompressor) getDecodedBuffer(src []byte, compCodec codecType, pool *pool.BucketedPool[byte]) (*bytes.Buffer, []byte, error) {
-	var (
-		decodedBufSize int
-		err error
-	)
-	switch compCodec {
-	case codecSnappy:
-		decodedBufSize, err = s2.DecodedLen(src)
-		if err != nil {
-			return nil, nil, err
-		}
-
-	default:
-		// Make a guess at the output size.
-		decodedBufSize = len(src) * 2
-	}
-	buf := pool.Get(decodedBufSize)[:0]
-
-	return bytes.NewBuffer(buf), buf, nil
-}
-
-func (d *decompressor) copyDecodedBuffer(decoded []byte, compCodec codecType, pool *pool.BucketedPool[byte]) []byte {
-	if compCodec == codecSnappy {
-		// We already know the actual size of the decoded buffer before decompression,
-		// so there's no need to copy the buffer.
-		return decoded
-	}
-	out := pool.Get(len(decoded))
-	return append(out[:0], decoded...)
-}
-
 var xerialPfx = []byte{130, 83, 78, 65, 80, 80, 89, 0}
 
 var errMalformedXerial = errors.New("malformed xerial framing")
diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/config.go b/vendor/github.com/twmb/franz-go/pkg/kgo/config.go
index dabaf03f618..92ebeaa3905 100644
--- a/vendor/github.com/twmb/franz-go/pkg/kgo/config.go
+++ b/vendor/github.com/twmb/franz-go/pkg/kgo/config.go
@@ -16,8 +16,6 @@ import (
 	"github.com/twmb/franz-go/pkg/kmsg"
 	"github.com/twmb/franz-go/pkg/kversion"
 	"github.com/twmb/franz-go/pkg/sasl"
-
-	"github.com/twmb/franz-go/pkg/kgo/internal/pool"
 )
 
 // Opt is an option to configure a client.
@@ -153,9 +151,6 @@ type cfg struct {
 	partitions map[string]map[int32]Offset // partitions to directly consume from
 	regex      bool
 
-	recordsPool          *recordsPool
-	decompressBufferPool *pool.BucketedPool[byte]
-
 	////////////////////////////
 	// CONSUMER GROUP SECTION //
 	////////////////////////////
@@ -394,11 +389,6 @@ func (cfg *cfg) validate() error {
 	}
 	cfg.hooks = processedHooks
 
-	// Assume a 2x compression ratio.
-	maxDecompressedBatchSize := int(cfg.maxBytes.load()) * 2
-	cfg.decompressBufferPool = pool.NewBucketedPool[byte](4096, maxDecompressedBatchSize, 2, func(sz int) []byte {
-		return make([]byte, sz)
-	})
 	return nil
 }
 
@@ -1357,18 +1347,6 @@ func ConsumeRegex() ConsumerOpt {
 	return consumerOpt{func(cfg *cfg) { cfg.regex = true }}
 }
 
-// EnableRecordsPool sets the client to obtain the *kgo.Record objects from a pool,
-// in order to minimize the number of allocations.
-//
-// By enabling this option, the records returned by PollFetches/PollRecords
-// can be sent back to the pool via ReuseRecords method in order to be recycled.
-//
-// This option is particularly useful for use cases where the volume of generated records is very high,
-// as it can negatively impact performance due to the extra GC overhead.
-func EnableRecordsPool() ConsumerOpt {
-	return consumerOpt{func(cfg *cfg) { cfg.recordsPool = newRecordsPool() }}
-}
-
 // DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+).
 //
 // A "fetch session" is is a way to reduce bandwidth for fetch requests &
diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go b/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go
deleted file mode 100644
index b5c435a55e2..00000000000
--- a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go
+++ /dev/null
@@ -1,94 +0,0 @@
-// Copyright 2017 The Prometheus Authors
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package pool
-
-import (
-	"sync"
-)
-
-// BucketedPool is a bucketed pool for variably sized slices.
-type BucketedPool[T any] struct {
-	buckets []sync.Pool
-	sizes   []int
-	// make is the function used to create an empty slice when none exist yet.
-	make func(int) []T
-}
-
-// NewBucketedPool returns a new BucketedPool with size buckets for minSize to maxSize
-// increasing by the given factor.
-func NewBucketedPool[T any](minSize, maxSize int, factor float64, makeFunc func(int) []T) *BucketedPool[T] {
-	if minSize < 1 {
-		panic("invalid minimum pool size")
-	}
-	if maxSize < 1 {
-		panic("invalid maximum pool size")
-	}
-	if factor < 1 {
-		panic("invalid factor")
-	}
-
-	var sizes []int
-
-	for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
-		sizes = append(sizes, s)
-	}
-
-	p := &BucketedPool[T]{
-		buckets: make([]sync.Pool, len(sizes)),
-		sizes:   sizes,
-		make:    makeFunc,
-	}
-	return p
-}
-
-// Get returns a new slice with capacity greater than or equal to size.
-func (p *BucketedPool[T]) Get(size int) []T {
-	for i, bktSize := range p.sizes {
-		if size > bktSize {
-			continue
-		}
-		buff := p.buckets[i].Get()
-		if buff == nil {
-			buff = p.make(bktSize)
-		}
-		return buff.([]T)
-	}
-	return p.make(size)
-}
-
-// Put adds a slice to the right bucket in the pool.
-// If the slice does not belong to any bucket in the pool, it is ignored.
-func (p *BucketedPool[T]) Put(s []T) {
-	sCap := cap(s)
-	if sCap < p.sizes[0] {
-		return
-	}
-
-	for i, size := range p.sizes {
-		if sCap > size {
-			continue
-		}
-
-		if sCap == size {
-			// Buffer is exactly the minimum size for this bucket. Add it to this bucket.
-			p.buckets[i].Put(s)
-		} else {
-			// Buffer belongs in previous bucket.
-			p.buckets[i-1].Put(s)
-		}
-		return
-	}
-}
-
-
diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go b/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go
index dbfb94510a6..4f1ebe6f524 100644
--- a/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go
+++ b/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go
@@ -6,8 +6,6 @@ import (
 	"reflect"
 	"time"
 	"unsafe"
-
-	"github.com/twmb/franz-go/pkg/kmsg"
 )
 
 // RecordHeader contains extra information that can be sent with Records.
@@ -151,38 +149,6 @@ type Record struct {
 	// producer hooks. It can also be set in a consumer hook to propagate
 	// enrichment to consumer clients.
 	Context context.Context
-
-	// recordsPool is the pool that this record was fetched from, if any.
-	//
-	// When reused, record is returned to this pool.
-	recordsPool *recordsPool
-
-	// rcBatchBuffer is used to keep track of the raw buffer that this record was
-	// derived from when consuming, after decompression.
-	//
-	// This is used to allow reusing these buffers when record pooling has been enabled
-	// via EnableRecordsPool option.
-	rcBatchBuffer *rcBuffer[byte]
-
-	// rcRawRecordsBuffer is used to keep track of the raw record buffer that this record was
-	// derived from when consuming.
-	//
-	// This is used to allow reusing these buffers when record pooling has been enabled
-	// via EnableRecordsPool option.
-	rcRawRecordsBuffer *rcBuffer[kmsg.Record]
-}
-
-// Reuse releases the record back to the pool.
-//
-//
-// Once this method has been called, any reference to the passed record should be considered invalid by the caller,
-// as it may be reused as a result of future calls to the PollFetches/PollRecords method.
-func (r *Record) Reuse() {
-	if r.recordsPool != nil {
-		r.rcRawRecordsBuffer.release()
-		r.rcBatchBuffer.release()
-		r.recordsPool.put(r)
-	}
 }
 
 func (r *Record) userSize() int64 {
diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go
index 69fafba7feb..85586a0a412 100644
--- a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go
+++ b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go
@@ -9,61 +9,13 @@ import (
 	"sort"
 	"strings"
 	"sync"
-	"sync/atomic"
 	"time"
 
 	"github.com/twmb/franz-go/pkg/kbin"
 	"github.com/twmb/franz-go/pkg/kerr"
-	"github.com/twmb/franz-go/pkg/kgo/internal/pool"
 	"github.com/twmb/franz-go/pkg/kmsg"
 )
 
-type recordsPool struct{ p *sync.Pool }
-
-func newRecordsPool() *recordsPool {
-	return &recordsPool{
-		p: &sync.Pool{New: func() any { return &Record{} }},
-	}
-}
-
-func (p *recordsPool) get() *Record {
-	return p.p.Get().(*Record)
-}
-
-func (p *recordsPool) put(r *Record) {
-	*r = Record{} // zero out the record
-	p.p.Put(r)
-}
-
-// rcBuffer is a reference counted buffer.
-//
-// The internal buffer will be sent back to the pool after calling release
-// when the ref count reaches 0.
-type rcBuffer[T any] struct {
-	refCount atomic.Int32
-	buffer   []T
-	pool     *pool.BucketedPool[T]
-}
-
-func newRCBuffer[T any](buffer []T, pool *pool.BucketedPool[T]) *rcBuffer[T] {
-	return &rcBuffer[T]{buffer: buffer, pool: pool}
-}
-
-func (b *rcBuffer[T]) acquire() {
-	b.refCount.Add(1)
-}
-
-func (b *rcBuffer[T]) release() {
-	if b.refCount.Add(-1) == 0 {
-		b.pool.Put(b.buffer)
-		b.buffer = nil
-		return
-	}
-	if b.refCount.Load() < 0 {
-		panic("rcBuffer released too many times")
-	}
-}
-
 type readerFrom interface {
 	ReadFrom([]byte) error
 }
@@ -158,12 +110,6 @@ type ProcessFetchPartitionOptions struct {
 
 	// Topic is used to populate the Partition field of each Record.
 	Partition int32
-
-	// DecompressBufferPool is a pool of buffers to use for decompressing batches.
-	DecompressBufferPool *pool.BucketedPool[byte]
-
-	// recordsPool is for internal use only.
-	recordPool *recordsPool
 }
 
 // cursor is where we are consuming from for an individual partition.
@@ -1142,7 +1088,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
 				continue
 			}
 
-			fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks, s.cl.cfg.recordsPool, s.cl.cfg.decompressBufferPool)
+			fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks)
 			if fp.Err != nil {
 				if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving {
 					strip(topic, partition, fp.Err)
@@ -1319,18 +1265,16 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
 
 // processRespPartition processes all records in all potentially compressed
 // batches (or message sets).
-func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks, recordsPool *recordsPool, decompressBufferPool *pool.BucketedPool[byte]) (fp FetchPartition) {
+func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks) (fp FetchPartition) {
 	if rp.ErrorCode == 0 {
 		o.hwm = rp.HighWatermark
 	}
 	opts := ProcessFetchPartitionOptions{
-		KeepControlRecords:   br.cl.cfg.keepControl,
-		Offset:               o.offset,
-		IsolationLevel:       IsolationLevel{br.cl.cfg.isolationLevel},
-		Topic:                o.from.topic,
-		Partition:            o.from.partition,
-		DecompressBufferPool: decompressBufferPool,
-		recordPool:           recordsPool,
+		KeepControlRecords: br.cl.cfg.keepControl,
+		Offset:             o.offset,
+		IsolationLevel:     IsolationLevel{br.cl.cfg.isolationLevel},
+		Topic:              o.from.topic,
+		Partition:          o.from.partition,
 	}
 	observeMetrics := func(m FetchBatchMetrics) {
 		hooks.each(func(h Hook) {
@@ -1539,17 +1483,11 @@ func (a aborter) trackAbortedPID(producerID int64) {
 // processing records to fetch part //
 //////////////////////////////////////
 
-var rawRecordsPool = pool.NewBucketedPool[kmsg.Record](32, 16*1024, 2, func(len int) []kmsg.Record {
-	return make([]kmsg.Record, len)
-})
-
 // readRawRecords reads n records from in and returns them, returning early if
 // there were partial records.
 func readRawRecords(n int, in []byte) []kmsg.Record {
-	rs := rawRecordsPool.Get(n)
-	rs = rs[:n]
+	rs := make([]kmsg.Record, n)
 	for i := 0; i < n; i++ {
-		rs[i] = kmsg.Record{}
 		length, used := kbin.Varint(in)
 		total := used + int(length)
 		if used == 0 || length < 0 || len(in) < total {
@@ -1585,7 +1523,7 @@ func processRecordBatch(
 	rawRecords := batch.Records
 	if compression := byte(batch.Attributes & 0x0007); compression != 0 {
 		var err error
-		if rawRecords, err = decompressor.decompress(rawRecords, compression, o.DecompressBufferPool); err != nil {
+		if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil {
 			return 0, 0 // truncated batch
 		}
 	}
@@ -1612,15 +1550,6 @@ func processRecordBatch(
 		}
 	}()
 
-	var (
-		rcBatchBuff      *rcBuffer[byte]
-		rcRawRecordsBuff *rcBuffer[kmsg.Record]
-	)
-	if o.recordPool != nil {
-		rcBatchBuff = newRCBuffer(rawRecords, o.DecompressBufferPool)
-		rcRawRecordsBuff = newRCBuffer(krecords, rawRecordsPool)
-	}
-
 	abortBatch := aborter.shouldAbortBatch(batch)
 	for i := range krecords {
 		record := recordToRecord(
@@ -1628,10 +1557,9 @@ func processRecordBatch(
 			fp.Partition,
 			batch,
 			&krecords[i],
-			o.recordPool,
 		)
+		o.maybeKeepRecord(fp, record, abortBatch)
 
-		o.maybeKeepRecord(fp, record, rcBatchBuff, rcRawRecordsBuff, abortBatch)
 		if abortBatch && record.Attrs.IsControl() {
 			// A control record has a key and a value where the key
 			// is int16 version and int16 type. Aborted records
@@ -1656,7 +1584,7 @@ func processV1OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition,
 		return 1, 0
 	}
 
-	rawInner, err := decompressor.decompress(message.Value, compression, o.DecompressBufferPool)
+	rawInner, err := decompressor.decompress(message.Value, compression)
 	if err != nil {
 		return 0, 0 // truncated batch
 	}
@@ -1751,7 +1679,7 @@ func processV1Message(
 		return false
 	}
 	record := v1MessageToRecord(o.Topic, fp.Partition, message)
-	o.maybeKeepRecord(fp, record, nil, nil, false)
+	o.maybeKeepRecord(fp, record, false)
 	return true
 }
 
@@ -1769,7 +1697,7 @@ func processV0OuterMessage(
 		return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return
 	}
 
-	rawInner, err := decompressor.decompress(message.Value, compression, o.DecompressBufferPool)
+	rawInner, err := decompressor.decompress(message.Value, compression)
 	if err != nil {
 		return 0, 0 // truncated batch
 	}
@@ -1829,7 +1757,7 @@ func processV0Message(
 		return false
 	}
 	record := v0MessageToRecord(o.Topic, fp.Partition, message)
-	o.maybeKeepRecord(fp, record, nil, nil, false)
+	o.maybeKeepRecord(fp, record, false)
 	return true
 }
 
@@ -1837,7 +1765,7 @@ func processV0Message(
 //
 // If the record is being aborted or the record is a control record and the
 // client does not want to keep control records, this does not keep the record.
-func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, rcBatchBuff *rcBuffer[byte], rcRawRecordsBuff *rcBuffer[kmsg.Record], abort bool) {
+func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) {
 	if record.Offset < o.Offset {
 		// We asked for offset 5, but that was in the middle of a
 		// batch; we got offsets 0 thru 4 that we need to skip.
@@ -1849,13 +1777,6 @@ func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, recor
 		abort = !o.KeepControlRecords
 	}
 	if !abort {
-		if rcBatchBuff != nil && rcRawRecordsBuff != nil {
-			rcBatchBuff.acquire()
-			record.rcBatchBuffer = rcBatchBuff
-
-			rcRawRecordsBuff.acquire()
-			record.rcRawRecordsBuffer = rcRawRecordsBuff
-		}
 		fp.Records = append(fp.Records, record)
 	}
 
@@ -1878,7 +1799,6 @@ func recordToRecord(
 	partition int32,
 	batch *kmsg.RecordBatch,
 	record *kmsg.Record,
-	recordsPool *recordsPool,
 ) *Record {
 	h := make([]RecordHeader, 0, len(record.Headers))
 	for _, kv := range record.Headers {
@@ -1887,25 +1807,19 @@ func recordToRecord(
 			Value: kv.Value,
 		})
 	}
-	var r *Record
-	if recordsPool != nil {
-		r = recordsPool.get()
-	} else {
-		r = new(Record)
-	}
-
-	r.Key = record.Key
-	r.Value = record.Value
-	r.Headers = h
-	r.Topic = topic
-	r.Partition = partition
-	r.Attrs = RecordAttrs{uint8(batch.Attributes)}
-	r.ProducerID = batch.ProducerID
-	r.ProducerEpoch = batch.ProducerEpoch
-	r.LeaderEpoch = batch.PartitionLeaderEpoch
-	r.Offset = batch.FirstOffset + int64(record.OffsetDelta)
-	r.recordsPool = recordsPool
 
+	r := &Record{
+		Key:           record.Key,
+		Value:         record.Value,
+		Headers:       h,
+		Topic:         topic,
+		Partition:     partition,
+		Attrs:         RecordAttrs{uint8(batch.Attributes)},
+		ProducerID:    batch.ProducerID,
+		ProducerEpoch: batch.ProducerEpoch,
+		LeaderEpoch:   batch.PartitionLeaderEpoch,
+		Offset:        batch.FirstOffset + int64(record.OffsetDelta),
+	}
 	if r.Attrs.TimestampType() == 0 {
 		r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64)
 	} else {
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 6436041628b..92725a413fd 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1143,12 +1143,11 @@ github.com/tklauser/go-sysconf
 # github.com/tklauser/numcpus v0.6.1
 ## explicit; go 1.13
 github.com/tklauser/numcpus
-# github.com/twmb/franz-go v1.17.1 => github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4
+# github.com/twmb/franz-go v1.17.1 => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9
 ## explicit; go 1.21
 github.com/twmb/franz-go/pkg/kbin
 github.com/twmb/franz-go/pkg/kerr
 github.com/twmb/franz-go/pkg/kgo
-github.com/twmb/franz-go/pkg/kgo/internal/pool
 github.com/twmb/franz-go/pkg/kgo/internal/sticky
 github.com/twmb/franz-go/pkg/kversion
 github.com/twmb/franz-go/pkg/sasl
@@ -1673,5 +1672,5 @@ sigs.k8s.io/yaml/goyaml.v3
 # github.com/opentracing-contrib/go-stdlib => github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956
 # github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc v0.0.0-20231024023642-e9298576254f
 # github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6
-# github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241003103709-f8f76a439eb4
+# github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9
 # google.golang.org/grpc => google.golang.org/grpc v1.65.0