diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 775a22e6..197c3933 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -78,8 +78,7 @@ type Client struct { producer producer consumer consumer - compressor *compressor - decompressor *decompressor + compressor *compressor coordinatorsMu sync.Mutex coordinators map[coordinatorKey]*coordinatorLoad @@ -482,8 +481,7 @@ func NewClient(opts ...Opt) (*Client, error) { bufPool: newBufPool(), prsPool: newPrsPool(), - compressor: compressor, - decompressor: newDecompressor(), + compressor: compressor, coordinators: make(map[coordinatorKey]*coordinatorLoad), diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go index fe8ad645..1adbe69a 100644 --- a/pkg/kgo/compression.go +++ b/pkg/kgo/compression.go @@ -12,6 +12,8 @@ import ( "github.com/klauspost/compress/s2" "github.com/klauspost/compress/zstd" "github.com/pierrec/lz4/v4" + + "github.com/twmb/franz-go/pkg/kgo/internal/pool" ) var byteBuffers = sync.Pool{New: func() any { return bytes.NewBuffer(make([]byte, 8<<10)) }} @@ -235,6 +237,8 @@ type decompressor struct { unzstdPool sync.Pool } +var defaultDecompressor = newDecompressor() + func newDecompressor() *decompressor { d := &decompressor{ ungzPool: sync.Pool{ @@ -264,15 +268,23 @@ type zstdDecoder struct { inner *zstd.Decoder } -func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { +func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPool[byte]) ([]byte, error) { // Early return in case there is no compression compCodec := codecType(codec) if compCodec == codecNone { return src, nil } - out := byteBuffers.Get().(*bytes.Buffer) - out.Reset() - defer byteBuffers.Put(out) + + out, buf, err := d.getDecodedBuffer(src, compCodec, pool) + if err != nil { + return nil, err + } + defer func() { + if compCodec == codecSnappy { + return + } + pool.Put(buf) + }() switch compCodec { case codecGzip: @@ -284,7 +296,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { if _, err := io.Copy(out, ungz); err != nil { return nil, err } - return append([]byte(nil), out.Bytes()...), nil + return d.copyDecodedBuffer(out.Bytes(), compCodec, pool), nil case codecSnappy: if len(src) > 16 && bytes.HasPrefix(src, xerialPfx) { return xerialDecode(src) @@ -293,7 +305,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { if err != nil { return nil, err } - return append([]byte(nil), decoded...), nil + return d.copyDecodedBuffer(decoded, compCodec, pool), nil case codecLZ4: unlz4 := d.unlz4Pool.Get().(*lz4.Reader) defer d.unlz4Pool.Put(unlz4) @@ -301,7 +313,7 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { if _, err := io.Copy(out, unlz4); err != nil { return nil, err } - return append([]byte(nil), out.Bytes()...), nil + return d.copyDecodedBuffer(out.Bytes(), compCodec, pool), nil case codecZstd: unzstd := d.unzstdPool.Get().(*zstdDecoder) defer d.unzstdPool.Put(unzstd) @@ -309,12 +321,43 @@ func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { if err != nil { return nil, err } - return append([]byte(nil), decoded...), nil + return d.copyDecodedBuffer(decoded, compCodec, pool), nil default: return nil, errors.New("unknown compression codec") } } +func (d *decompressor) getDecodedBuffer(src []byte, compCodec codecType, pool *pool.BucketedPool[byte]) (*bytes.Buffer, []byte, error) { + var ( + decodedBufSize int + err error + ) + switch compCodec { + case codecSnappy: + decodedBufSize, err = s2.DecodedLen(src) + if err != nil { + return nil, nil, err + } + + default: + // Make a guess at the output size. + decodedBufSize = len(src) * 2 + } + buf := pool.Get(decodedBufSize)[:0] + + return bytes.NewBuffer(buf), buf, nil +} + +func (d *decompressor) copyDecodedBuffer(decoded []byte, compCodec codecType, pool *pool.BucketedPool[byte]) []byte { + if compCodec == codecSnappy { + // We already know the actual size of the decoded buffer before decompression, + // so there's no need to copy the buffer. + return decoded + } + out := pool.Get(len(decoded)) + return append(out[:0], decoded...) +} + var xerialPfx = []byte{130, 83, 78, 65, 80, 80, 89, 0} var errMalformedXerial = errors.New("malformed xerial framing") diff --git a/pkg/kgo/compression_test.go b/pkg/kgo/compression_test.go index e8fb6729..005e3dfd 100644 --- a/pkg/kgo/compression_test.go +++ b/pkg/kgo/compression_test.go @@ -10,6 +10,8 @@ import ( "testing" "github.com/pierrec/lz4/v4" + + "github.com/twmb/franz-go/pkg/kgo/internal/pool" ) // Regression test for #778. @@ -78,6 +80,8 @@ func TestCompressDecompress(t *testing.T) { randStr(1 << 8), } + buffPool := pool.NewBucketedPool(1, 1<<16, 2, func(int) []byte { return make([]byte, 1<<16) }) + var wg sync.WaitGroup for _, produceVersion := range []int16{ 0, 7, @@ -110,7 +114,7 @@ func TestCompressDecompress(t *testing.T) { w.Reset() got, used := c.compress(w, in, produceVersion) - got, err := d.decompress(got, byte(used)) + got, err := d.decompress(got, byte(used), buffPool) if err != nil { t.Errorf("unexpected decompress err: %v", err) return @@ -156,7 +160,7 @@ func BenchmarkDecompress(b *testing.B) { b.Run(fmt.Sprint(codec), func(b *testing.B) { for i := 0; i < b.N; i++ { d := newDecompressor() - d.decompress(w.Bytes(), byte(codec)) + d.decompress(w.Bytes(), byte(codec), pool.NewBucketedPool(1, 1<<16, 2, func(int) []byte { return make([]byte, 1<<16) })) } }) byteBuffers.Put(w) diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 92ebeaa3..dabaf03f 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -16,6 +16,8 @@ import ( "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kversion" "github.com/twmb/franz-go/pkg/sasl" + + "github.com/twmb/franz-go/pkg/kgo/internal/pool" ) // Opt is an option to configure a client. @@ -151,6 +153,9 @@ type cfg struct { partitions map[string]map[int32]Offset // partitions to directly consume from regex bool + recordsPool *recordsPool + decompressBufferPool *pool.BucketedPool[byte] + //////////////////////////// // CONSUMER GROUP SECTION // //////////////////////////// @@ -389,6 +394,11 @@ func (cfg *cfg) validate() error { } cfg.hooks = processedHooks + // Assume a 2x compression ratio. + maxDecompressedBatchSize := int(cfg.maxBytes.load()) * 2 + cfg.decompressBufferPool = pool.NewBucketedPool[byte](4096, maxDecompressedBatchSize, 2, func(sz int) []byte { + return make([]byte, sz) + }) return nil } @@ -1347,6 +1357,18 @@ func ConsumeRegex() ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.regex = true }} } +// EnableRecordsPool sets the client to obtain the *kgo.Record objects from a pool, +// in order to minimize the number of allocations. +// +// By enabling this option, the records returned by PollFetches/PollRecords +// can be sent back to the pool via ReuseRecords method in order to be recycled. +// +// This option is particularly useful for use cases where the volume of generated records is very high, +// as it can negatively impact performance due to the extra GC overhead. +func EnableRecordsPool() ConsumerOpt { + return consumerOpt{func(cfg *cfg) { cfg.recordsPool = newRecordsPool() }} +} + // DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+). // // A "fetch session" is is a way to reduce bandwidth for fetch requests & diff --git a/pkg/kgo/internal/pool/bucketed_pool.go b/pkg/kgo/internal/pool/bucketed_pool.go new file mode 100644 index 00000000..b5c435a5 --- /dev/null +++ b/pkg/kgo/internal/pool/bucketed_pool.go @@ -0,0 +1,94 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pool + +import ( + "sync" +) + +// BucketedPool is a bucketed pool for variably sized slices. +type BucketedPool[T any] struct { + buckets []sync.Pool + sizes []int + // make is the function used to create an empty slice when none exist yet. + make func(int) []T +} + +// NewBucketedPool returns a new BucketedPool with size buckets for minSize to maxSize +// increasing by the given factor. +func NewBucketedPool[T any](minSize, maxSize int, factor float64, makeFunc func(int) []T) *BucketedPool[T] { + if minSize < 1 { + panic("invalid minimum pool size") + } + if maxSize < 1 { + panic("invalid maximum pool size") + } + if factor < 1 { + panic("invalid factor") + } + + var sizes []int + + for s := minSize; s <= maxSize; s = int(float64(s) * factor) { + sizes = append(sizes, s) + } + + p := &BucketedPool[T]{ + buckets: make([]sync.Pool, len(sizes)), + sizes: sizes, + make: makeFunc, + } + return p +} + +// Get returns a new slice with capacity greater than or equal to size. +func (p *BucketedPool[T]) Get(size int) []T { + for i, bktSize := range p.sizes { + if size > bktSize { + continue + } + buff := p.buckets[i].Get() + if buff == nil { + buff = p.make(bktSize) + } + return buff.([]T) + } + return p.make(size) +} + +// Put adds a slice to the right bucket in the pool. +// If the slice does not belong to any bucket in the pool, it is ignored. +func (p *BucketedPool[T]) Put(s []T) { + sCap := cap(s) + if sCap < p.sizes[0] { + return + } + + for i, size := range p.sizes { + if sCap > size { + continue + } + + if sCap == size { + // Buffer is exactly the minimum size for this bucket. Add it to this bucket. + p.buckets[i].Put(s) + } else { + // Buffer belongs in previous bucket. + p.buckets[i-1].Put(s) + } + return + } +} + + diff --git a/pkg/kgo/internal/pool/bucketed_pool_test.go b/pkg/kgo/internal/pool/bucketed_pool_test.go new file mode 100644 index 00000000..b83baab3 --- /dev/null +++ b/pkg/kgo/internal/pool/bucketed_pool_test.go @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: Apache-2.0 +// Provenance-includes-location: https://github.com/prometheus/prometheus/blob/main/util/pool/pool_test.go +// Provenance-includes-copyright: The Prometheus Authors + +package pool + +import ( + "testing" +) + +func makeFunc(size int) []int { + return make([]int, 0, size) +} + +func TestBucketedPool_HappyPath(t *testing.T) { + testPool := NewBucketedPool(1, 8, 2, makeFunc) + cases := []struct { + size int + expectedCap int + }{ + { + size: -1, + expectedCap: 1, + }, + { + size: 3, + expectedCap: 4, + }, + { + size: 10, + expectedCap: 10, + }, + } + for _, c := range cases { + ret := testPool.Get(c.size) + if cap(ret) < c.expectedCap { + t.Fatalf("expected cap >= %d, got %d", c.expectedCap , cap(ret)) + } + testPool.Put(ret) + } +} + +func TestBucketedPool_SliceNotAlignedToBuckets(t *testing.T) { + pool := NewBucketedPool(1, 1000, 10, makeFunc) + pool.Put(make([]int, 0, 2)) + s := pool.Get(3) + if cap(s) < 3 { + t.Fatalf("expected cap >= 3, got %d", cap(s)) + } +} + +func TestBucketedPool_PutEmptySlice(t *testing.T) { + pool := NewBucketedPool(1, 1000, 10, makeFunc) + pool.Put([]int{}) + s := pool.Get(1) + if cap(s) < 1 { + t.Fatalf("expected cap >= 1, got %d", cap(s)) + } +} + +func TestBucketedPool_PutSliceSmallerThanMinimum(t *testing.T) { + pool := NewBucketedPool(3, 1000, 10, makeFunc) + pool.Put([]int{1, 2}) + s := pool.Get(3) + if cap(s) < 3 { + t.Fatalf("expected cap >= 3, got %d", cap(s)) + } +} diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 4f1ebe6f..dbfb9451 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -6,6 +6,8 @@ import ( "reflect" "time" "unsafe" + + "github.com/twmb/franz-go/pkg/kmsg" ) // RecordHeader contains extra information that can be sent with Records. @@ -149,6 +151,38 @@ type Record struct { // producer hooks. It can also be set in a consumer hook to propagate // enrichment to consumer clients. Context context.Context + + // recordsPool is the pool that this record was fetched from, if any. + // + // When reused, record is returned to this pool. + recordsPool *recordsPool + + // rcBatchBuffer is used to keep track of the raw buffer that this record was + // derived from when consuming, after decompression. + // + // This is used to allow reusing these buffers when record pooling has been enabled + // via EnableRecordsPool option. + rcBatchBuffer *rcBuffer[byte] + + // rcRawRecordsBuffer is used to keep track of the raw record buffer that this record was + // derived from when consuming. + // + // This is used to allow reusing these buffers when record pooling has been enabled + // via EnableRecordsPool option. + rcRawRecordsBuffer *rcBuffer[kmsg.Record] +} + +// Reuse releases the record back to the pool. +// +// +// Once this method has been called, any reference to the passed record should be considered invalid by the caller, +// as it may be reused as a result of future calls to the PollFetches/PollRecords method. +func (r *Record) Reuse() { + if r.recordsPool != nil { + r.rcRawRecordsBuffer.release() + r.rcBatchBuffer.release() + r.recordsPool.put(r) + } } func (r *Record) userSize() int64 { diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 0c475d14..69fafba7 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -9,13 +9,61 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/twmb/franz-go/pkg/kbin" "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kgo/internal/pool" "github.com/twmb/franz-go/pkg/kmsg" ) +type recordsPool struct{ p *sync.Pool } + +func newRecordsPool() *recordsPool { + return &recordsPool{ + p: &sync.Pool{New: func() any { return &Record{} }}, + } +} + +func (p *recordsPool) get() *Record { + return p.p.Get().(*Record) +} + +func (p *recordsPool) put(r *Record) { + *r = Record{} // zero out the record + p.p.Put(r) +} + +// rcBuffer is a reference counted buffer. +// +// The internal buffer will be sent back to the pool after calling release +// when the ref count reaches 0. +type rcBuffer[T any] struct { + refCount atomic.Int32 + buffer []T + pool *pool.BucketedPool[T] +} + +func newRCBuffer[T any](buffer []T, pool *pool.BucketedPool[T]) *rcBuffer[T] { + return &rcBuffer[T]{buffer: buffer, pool: pool} +} + +func (b *rcBuffer[T]) acquire() { + b.refCount.Add(1) +} + +func (b *rcBuffer[T]) release() { + if b.refCount.Add(-1) == 0 { + b.pool.Put(b.buffer) + b.buffer = nil + return + } + if b.refCount.Load() < 0 { + panic("rcBuffer released too many times") + } +} + type readerFrom interface { ReadFrom([]byte) error } @@ -92,6 +140,32 @@ func (s *source) removeCursor(rm *cursor) { } } +type ProcessFetchPartitionOptions struct { + // KeepControlRecords sets the parser to keep control messages and return + // them with fetches, overriding the default that discards them. + // + // Generally, control messages are not useful. It is the same as kgo.KeepControlRecords(). + KeepControlRecords bool + + // Offset is the minimum offset for which we'll parse records. Records with lower offsets will not be parsed or returned. + Offset int64 + + // IsolationLevel controls whether or not to return uncomitted records. See kgo.IsolationLevel. + IsolationLevel IsolationLevel + + // Topic is used to populate the Topic field of each Record. + Topic string + + // Topic is used to populate the Partition field of each Record. + Partition int32 + + // DecompressBufferPool is a pool of buffers to use for decompressing batches. + DecompressBufferPool *pool.BucketedPool[byte] + + // recordsPool is for internal use only. + recordPool *recordsPool +} + // cursor is where we are consuming from for an individual partition. type cursor struct { topic string @@ -1068,7 +1142,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks) + fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks, s.cl.cfg.recordsPool, s.cl.cfg.decompressBufferPool) if fp.Err != nil { if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving { strip(topic, partition, fp.Err) @@ -1245,7 +1319,43 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // processRespPartition processes all records in all potentially compressed // batches (or message sets). -func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks) FetchPartition { +func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks, recordsPool *recordsPool, decompressBufferPool *pool.BucketedPool[byte]) (fp FetchPartition) { + if rp.ErrorCode == 0 { + o.hwm = rp.HighWatermark + } + opts := ProcessFetchPartitionOptions{ + KeepControlRecords: br.cl.cfg.keepControl, + Offset: o.offset, + IsolationLevel: IsolationLevel{br.cl.cfg.isolationLevel}, + Topic: o.from.topic, + Partition: o.from.partition, + DecompressBufferPool: decompressBufferPool, + recordPool: recordsPool, + } + observeMetrics := func(m FetchBatchMetrics) { + hooks.each(func(h Hook) { + if h, ok := h.(HookFetchBatchRead); ok { + h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) + } + }) + } + fp, o.offset = ProcessRespPartition(opts, rp, observeMetrics) + if len(fp.Records) > 0 { + lastRecord := fp.Records[len(fp.Records)-1] + // We adjust the offset separately because it may be larger than the offset of the last record for compacted partitions. + o.lastConsumedEpoch = lastRecord.LeaderEpoch + o.lastConsumedTime = lastRecord.Timestamp + } + + return fp +} + +// ProcessRespPartition processes all records in all potentially compressed batches (or message sets). +// ProcessRespPartition returns the FetchPartition and the last offset of records processed. observeMetrics can be nil. +// This is useful when issuing manual Fetch requests for records. +// In case of a compacted partition, the last offset may be larger than the offset of the last record. +// If the partition response is truncated and the partiiton was compacted, then the last offset is the offset of the last record. +func ProcessRespPartition(o ProcessFetchPartitionOptions, rp *kmsg.FetchResponseTopicPartition, observeMetrics func(FetchBatchMetrics)) (FetchPartition, int64) { fp := FetchPartition{ Partition: rp.Partition, Err: kerr.ErrorForCode(rp.ErrorCode), @@ -1253,12 +1363,9 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon LastStableOffset: rp.LastStableOffset, LogStartOffset: rp.LogStartOffset, } - if rp.ErrorCode == 0 { - o.hwm = rp.HighWatermark - } var aborter aborter - if br.cl.cfg.isolationLevel == 1 { + if o.IsolationLevel.level == 1 { aborter = buildAborter(rp) } @@ -1349,10 +1456,10 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon default: fp.Err = fmt.Errorf("unknown magic %d; message offset is %d and length is %d, skipping and setting to next offset", magic, offset, length) - if next := offset + 1; next > o.offset { - o.offset = next + if next := offset + 1; next > o.Offset { + o.Offset = next } - return fp + return fp, o.Offset } if !check() { @@ -1367,30 +1474,27 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon case *kmsg.MessageV0: m.CompressedBytes = int(length) // for message sets, we include the message set overhead in length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processV0OuterMessage(&fp, t, decompressor) + m.NumRecords, m.UncompressedBytes = processV0OuterMessage(&o, &fp, t, defaultDecompressor) case *kmsg.MessageV1: m.CompressedBytes = int(length) m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processV1OuterMessage(&fp, t, decompressor) + m.NumRecords, m.UncompressedBytes = processV1OuterMessage(&o, &fp, t, defaultDecompressor) case *kmsg.RecordBatch: m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor) + m.NumRecords, m.UncompressedBytes = processRecordBatch(&o, &fp, t, aborter, defaultDecompressor) } if m.UncompressedBytes == 0 { m.UncompressedBytes = m.CompressedBytes } - hooks.each(func(h Hook) { - if h, ok := h.(HookFetchBatchRead); ok { - h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) - } - }) + if observeMetrics != nil { + observeMetrics(m) + } } - - return fp + return fp, o.Offset } type aborter map[int64][]int64 @@ -1435,11 +1539,17 @@ func (a aborter) trackAbortedPID(producerID int64) { // processing records to fetch part // ////////////////////////////////////// +var rawRecordsPool = pool.NewBucketedPool[kmsg.Record](32, 16*1024, 2, func(len int) []kmsg.Record { + return make([]kmsg.Record, len) +}) + // readRawRecords reads n records from in and returns them, returning early if // there were partial records. func readRawRecords(n int, in []byte) []kmsg.Record { - rs := make([]kmsg.Record, n) + rs := rawRecordsPool.Get(n) + rs = rs[:n] for i := 0; i < n; i++ { + rs[i] = kmsg.Record{} length, used := kbin.Varint(in) total := used + int(length) if used == 0 || length < 0 || len(in) < total { @@ -1453,7 +1563,8 @@ func readRawRecords(n int, in []byte) []kmsg.Record { return rs } -func (o *cursorOffsetNext) processRecordBatch( +func processRecordBatch( + o *ProcessFetchPartitionOptions, fp *FetchPartition, batch *kmsg.RecordBatch, aborter aborter, @@ -1464,7 +1575,7 @@ func (o *cursorOffsetNext) processRecordBatch( return 0, 0 } lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta) - if lastOffset < o.offset { + if lastOffset < o.Offset { // If the last offset in this batch is less than what we asked // for, we got a batch that we entirely do not need. We can // avoid all work (although we should not get this batch). @@ -1474,7 +1585,7 @@ func (o *cursorOffsetNext) processRecordBatch( rawRecords := batch.Records if compression := byte(batch.Attributes & 0x0007); compression != 0 { var err error - if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil { + if rawRecords, err = decompressor.decompress(rawRecords, compression, o.DecompressBufferPool); err != nil { return 0, 0 // truncated batch } } @@ -1496,21 +1607,31 @@ func (o *cursorOffsetNext) processRecordBatch( // either advance offsets or will set to nextAskOffset. nextAskOffset := lastOffset + 1 defer func() { - if numRecords == len(krecords) && o.offset < nextAskOffset { - o.offset = nextAskOffset + if numRecords == len(krecords) && o.Offset < nextAskOffset { + o.Offset = nextAskOffset } }() + var ( + rcBatchBuff *rcBuffer[byte] + rcRawRecordsBuff *rcBuffer[kmsg.Record] + ) + if o.recordPool != nil { + rcBatchBuff = newRCBuffer(rawRecords, o.DecompressBufferPool) + rcRawRecordsBuff = newRCBuffer(krecords, rawRecordsPool) + } + abortBatch := aborter.shouldAbortBatch(batch) for i := range krecords { record := recordToRecord( - o.from.topic, + o.Topic, fp.Partition, batch, &krecords[i], + o.recordPool, ) - o.maybeKeepRecord(fp, record, abortBatch) + o.maybeKeepRecord(fp, record, rcBatchBuff, rcRawRecordsBuff, abortBatch) if abortBatch && record.Attrs.IsControl() { // A control record has a key and a value where the key // is int16 version and int16 type. Aborted records @@ -1528,18 +1649,14 @@ func (o *cursorOffsetNext) processRecordBatch( // this easy, but if not, we decompress and process each inner message as // either v0 or v1. We only expect the inner message to be v1, but technically // a crazy pipeline could have v0 anywhere. -func (o *cursorOffsetNext) processV1OuterMessage( - fp *FetchPartition, - message *kmsg.MessageV1, - decompressor *decompressor, -) (int, int) { +func processV1OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, decompressor *decompressor) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { - o.processV1Message(fp, message) + processV1Message(o, fp, message) return 1, 0 } - rawInner, err := decompressor.decompress(message.Value, compression) + rawInner, err := decompressor.decompress(message.Value, compression, o.DecompressBufferPool) if err != nil { return 0, 0 // truncated batch } @@ -1606,13 +1723,13 @@ out: case *kmsg.MessageV0: innerMessage.Offset = firstOffset + int64(i) innerMessage.Attributes |= int8(compression) - if !o.processV0Message(fp, innerMessage) { + if !processV0Message(o, fp, innerMessage) { return i, uncompressedBytes } case *kmsg.MessageV1: innerMessage.Offset = firstOffset + int64(i) innerMessage.Attributes |= int8(compression) - if !o.processV1Message(fp, innerMessage) { + if !processV1Message(o, fp, innerMessage) { return i, uncompressedBytes } } @@ -1620,7 +1737,8 @@ out: return len(innerMessages), uncompressedBytes } -func (o *cursorOffsetNext) processV1Message( +func processV1Message( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, ) bool { @@ -1632,25 +1750,26 @@ func (o *cursorOffsetNext) processV1Message( fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes) return false } - record := v1MessageToRecord(o.from.topic, fp.Partition, message) - o.maybeKeepRecord(fp, record, false) + record := v1MessageToRecord(o.Topic, fp.Partition, message) + o.maybeKeepRecord(fp, record, nil, nil, false) return true } // Processes an outer v0 message. We expect inner messages to be entirely v0 as // well, so this only tries v0 always. -func (o *cursorOffsetNext) processV0OuterMessage( +func processV0OuterMessage( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, decompressor *decompressor, ) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { - o.processV0Message(fp, message) + processV0Message(o, fp, message) return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return } - rawInner, err := decompressor.decompress(message.Value, compression) + rawInner, err := decompressor.decompress(message.Value, compression, o.DecompressBufferPool) if err != nil { return 0, 0 // truncated batch } @@ -1689,14 +1808,15 @@ func (o *cursorOffsetNext) processV0OuterMessage( innerMessage := &innerMessages[i] innerMessage.Attributes |= int8(compression) innerMessage.Offset = firstOffset + int64(i) - if !o.processV0Message(fp, innerMessage) { + if !processV0Message(o, fp, innerMessage) { return i, uncompressedBytes } } return len(innerMessages), uncompressedBytes } -func (o *cursorOffsetNext) processV0Message( +func processV0Message( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, ) bool { @@ -1708,8 +1828,8 @@ func (o *cursorOffsetNext) processV0Message( fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes) return false } - record := v0MessageToRecord(o.from.topic, fp.Partition, message) - o.maybeKeepRecord(fp, record, false) + record := v0MessageToRecord(o.Topic, fp.Partition, message) + o.maybeKeepRecord(fp, record, nil, nil, false) return true } @@ -1717,8 +1837,8 @@ func (o *cursorOffsetNext) processV0Message( // // If the record is being aborted or the record is a control record and the // client does not want to keep control records, this does not keep the record. -func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { - if record.Offset < o.offset { +func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, rcBatchBuff *rcBuffer[byte], rcRawRecordsBuff *rcBuffer[kmsg.Record], abort bool) { + if record.Offset < o.Offset { // We asked for offset 5, but that was in the middle of a // batch; we got offsets 0 thru 4 that we need to skip. return @@ -1726,17 +1846,22 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a // We only keep control records if specifically requested. if record.Attrs.IsControl() { - abort = !o.from.keepControl + abort = !o.KeepControlRecords } if !abort { + if rcBatchBuff != nil && rcRawRecordsBuff != nil { + rcBatchBuff.acquire() + record.rcBatchBuffer = rcBatchBuff + + rcRawRecordsBuff.acquire() + record.rcRawRecordsBuffer = rcRawRecordsBuff + } fp.Records = append(fp.Records, record) } // The record offset may be much larger than our expected offset if the // topic is compacted. - o.offset = record.Offset + 1 - o.lastConsumedEpoch = record.LeaderEpoch - o.lastConsumedTime = record.Timestamp + o.Offset = record.Offset + 1 } /////////////////////////////// @@ -1753,6 +1878,7 @@ func recordToRecord( partition int32, batch *kmsg.RecordBatch, record *kmsg.Record, + recordsPool *recordsPool, ) *Record { h := make([]RecordHeader, 0, len(record.Headers)) for _, kv := range record.Headers { @@ -1761,19 +1887,25 @@ func recordToRecord( Value: kv.Value, }) } - - r := &Record{ - Key: record.Key, - Value: record.Value, - Headers: h, - Topic: topic, - Partition: partition, - Attrs: RecordAttrs{uint8(batch.Attributes)}, - ProducerID: batch.ProducerID, - ProducerEpoch: batch.ProducerEpoch, - LeaderEpoch: batch.PartitionLeaderEpoch, - Offset: batch.FirstOffset + int64(record.OffsetDelta), + var r *Record + if recordsPool != nil { + r = recordsPool.get() + } else { + r = new(Record) } + + r.Key = record.Key + r.Value = record.Value + r.Headers = h + r.Topic = topic + r.Partition = partition + r.Attrs = RecordAttrs{uint8(batch.Attributes)} + r.ProducerID = batch.ProducerID + r.ProducerEpoch = batch.ProducerEpoch + r.LeaderEpoch = batch.PartitionLeaderEpoch + r.Offset = batch.FirstOffset + int64(record.OffsetDelta) + r.recordsPool = recordsPool + if r.Attrs.TimestampType() == 0 { r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64) } else {