diff --git a/pkg/kgo/config.go b/pkg/kgo/config.go index 92ebeaa3..8113d35a 100644 --- a/pkg/kgo/config.go +++ b/pkg/kgo/config.go @@ -151,6 +151,8 @@ type cfg struct { partitions map[string]map[int32]Offset // partitions to directly consume from regex bool + recordsPool recordsPool + //////////////////////////// // CONSUMER GROUP SECTION // //////////////////////////// @@ -1347,6 +1349,18 @@ func ConsumeRegex() ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.regex = true }} } +// EnableRecordsPool sets the client to obtain the *kgo.Record objects from a pool, +// in order to minimize the number of allocations. +// +// By enabling this option, the records returned by PollFetches/PollRecords +// can be sent back to the pool via Reuse method in order to be recycled. +// +// This option is particularly useful for use cases where the volume of generated records is very high, +// as it can negatively impact performance due to the extra GC overhead. +func EnableRecordsPool() ConsumerOpt { + return consumerOpt{func(cfg *cfg) { cfg.recordsPool = newRecordsPool() }} +} + // DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+). // // A "fetch session" is is a way to reduce bandwidth for fetch requests & diff --git a/pkg/kgo/record_and_fetch.go b/pkg/kgo/record_and_fetch.go index 4f1ebe6f..44765169 100644 --- a/pkg/kgo/record_and_fetch.go +++ b/pkg/kgo/record_and_fetch.go @@ -149,6 +149,19 @@ type Record struct { // producer hooks. It can also be set in a consumer hook to propagate // enrichment to consumer clients. Context context.Context + + // recordsPool is the pool that this record was fetched from, if any. + // + // When reused, record is returned to this pool. + recordsPool recordsPool +} + +// Reuse releases the record back to the pool. +// +// Once this method has been called, any reference to the passed record should be considered invalid by the caller, +// as it may be reused as a result of future calls to the PollFetches/PollRecords method. +func (r *Record) Reuse() { + r.recordsPool.put(r) } func (r *Record) userSize() int64 { diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 0c475d14..f7e267bd 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -16,6 +16,29 @@ import ( "github.com/twmb/franz-go/pkg/kmsg" ) +type recordsPool struct{ p *sync.Pool } + +func newRecordsPool() recordsPool { + return recordsPool{ + p: &sync.Pool{New: func() any { return &Record{} }}, + } +} + +func (p recordsPool) get() *Record { + if p.p == nil { + return &Record{} + } + return p.p.Get().(*Record) +} + +func (p recordsPool) put(r *Record) { + if p.p == nil { + return + } + *r = Record{} // zero out the record + p.p.Put(r) +} + type readerFrom interface { ReadFrom([]byte) error } @@ -1068,7 +1091,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks) + fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks, s.cl.cfg.recordsPool) if fp.Err != nil { if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving { strip(topic, partition, fp.Err) @@ -1245,7 +1268,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // processRespPartition processes all records in all potentially compressed // batches (or message sets). -func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks) FetchPartition { +func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks, recordsPool recordsPool) FetchPartition { fp := FetchPartition{ Partition: rp.Partition, Err: kerr.ErrorForCode(rp.ErrorCode), @@ -1377,7 +1400,7 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon case *kmsg.RecordBatch: m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor) + m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor, recordsPool) } if m.UncompressedBytes == 0 { @@ -1458,6 +1481,7 @@ func (o *cursorOffsetNext) processRecordBatch( batch *kmsg.RecordBatch, aborter aborter, decompressor *decompressor, + recordsPool recordsPool, ) (int, int) { if batch.Magic != 2 { fp.Err = fmt.Errorf("unknown batch magic %d", batch.Magic) @@ -1508,6 +1532,7 @@ func (o *cursorOffsetNext) processRecordBatch( fp.Partition, batch, &krecords[i], + recordsPool, ) o.maybeKeepRecord(fp, record, abortBatch) @@ -1753,6 +1778,7 @@ func recordToRecord( partition int32, batch *kmsg.RecordBatch, record *kmsg.Record, + recordsPool recordsPool, ) *Record { h := make([]RecordHeader, 0, len(record.Headers)) for _, kv := range record.Headers { @@ -1761,19 +1787,20 @@ func recordToRecord( Value: kv.Value, }) } + r := recordsPool.get() + + r.Key = record.Key + r.Value = record.Value + r.Headers = h + r.Topic = topic + r.Partition = partition + r.Attrs = RecordAttrs{uint8(batch.Attributes)} + r.ProducerID = batch.ProducerID + r.ProducerEpoch = batch.ProducerEpoch + r.LeaderEpoch = batch.PartitionLeaderEpoch + r.Offset = batch.FirstOffset + int64(record.OffsetDelta) + r.recordsPool = recordsPool - r := &Record{ - Key: record.Key, - Value: record.Value, - Headers: h, - Topic: topic, - Partition: partition, - Attrs: RecordAttrs{uint8(batch.Attributes)}, - ProducerID: batch.ProducerID, - ProducerEpoch: batch.ProducerEpoch, - LeaderEpoch: batch.PartitionLeaderEpoch, - Offset: batch.FirstOffset + int64(record.OffsetDelta), - } if r.Attrs.TimestampType() == 0 { r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64) } else {