Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*kgo.Record pooling support #827

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type cfg struct {
partitions map[string]map[int32]Offset // partitions to directly consume from
regex bool

recordsPool recordsPool

////////////////////////////
// CONSUMER GROUP SECTION //
////////////////////////////
Expand Down Expand Up @@ -1347,6 +1349,18 @@ func ConsumeRegex() ConsumerOpt {
return consumerOpt{func(cfg *cfg) { cfg.regex = true }}
}

// EnableRecordsPool sets the client to obtain the *kgo.Record objects from a pool,
// in order to minimize the number of allocations.
//
// By enabling this option, the records returned by PollFetches/PollRecords
// can be sent back to the pool via Reuse method in order to be recycled.
//
// This option is particularly useful for use cases where the volume of generated records is very high,
// as it can negatively impact performance due to the extra GC overhead.
func EnableRecordsPool() ConsumerOpt {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Truth is, I'm not 100% convinced about adding this option. For simplicity, we could always fetch the records directly from the pool, since if the user doesn't explicitly invoke the Reuse method, these will essentially always end up being allocated from the heap.

return consumerOpt{func(cfg *cfg) { cfg.recordsPool = newRecordsPool() }}
}

// DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+).
//
// A "fetch session" is is a way to reduce bandwidth for fetch requests &
Expand Down
13 changes: 13 additions & 0 deletions pkg/kgo/record_and_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,19 @@ type Record struct {
// producer hooks. It can also be set in a consumer hook to propagate
// enrichment to consumer clients.
Context context.Context

// recordsPool is the pool that this record was fetched from, if any.
//
// When reused, record is returned to this pool.
recordsPool recordsPool
}

// Reuse releases the record back to the pool.
//
// Once this method has been called, any reference to the passed record should be considered invalid by the caller,
// as it may be reused as a result of future calls to the PollFetches/PollRecords method.
func (r *Record) Reuse() {
r.recordsPool.put(r)
}

func (r *Record) userSize() int64 {
Expand Down
57 changes: 42 additions & 15 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,29 @@ import (
"github.com/twmb/franz-go/pkg/kmsg"
)

type recordsPool struct{ p *sync.Pool }

func newRecordsPool() recordsPool {
return recordsPool{
p: &sync.Pool{New: func() any { return &Record{} }},
}
}

func (p recordsPool) get() *Record {
if p.p == nil {
return &Record{}
}
return p.p.Get().(*Record)
}

func (p recordsPool) put(r *Record) {
if p.p == nil {
return
}
*r = Record{} // zero out the record
p.p.Put(r)
}

type readerFrom interface {
ReadFrom([]byte) error
}
Expand Down Expand Up @@ -1068,7 +1091,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe
continue
}

fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks)
fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks, s.cl.cfg.recordsPool)
if fp.Err != nil {
if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving {
strip(topic, partition, fp.Err)
Expand Down Expand Up @@ -1245,7 +1268,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe

// processRespPartition processes all records in all potentially compressed
// batches (or message sets).
func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks) FetchPartition {
func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks, recordsPool recordsPool) FetchPartition {
fp := FetchPartition{
Partition: rp.Partition,
Err: kerr.ErrorForCode(rp.ErrorCode),
Expand Down Expand Up @@ -1377,7 +1400,7 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon
case *kmsg.RecordBatch:
m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length
m.CompressionType = uint8(t.Attributes) & 0b0000_0111
m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor)
m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor, recordsPool)
}

if m.UncompressedBytes == 0 {
Expand Down Expand Up @@ -1458,6 +1481,7 @@ func (o *cursorOffsetNext) processRecordBatch(
batch *kmsg.RecordBatch,
aborter aborter,
decompressor *decompressor,
recordsPool recordsPool,
) (int, int) {
if batch.Magic != 2 {
fp.Err = fmt.Errorf("unknown batch magic %d", batch.Magic)
Expand Down Expand Up @@ -1508,6 +1532,7 @@ func (o *cursorOffsetNext) processRecordBatch(
fp.Partition,
batch,
&krecords[i],
recordsPool,
)
o.maybeKeepRecord(fp, record, abortBatch)

Expand Down Expand Up @@ -1753,6 +1778,7 @@ func recordToRecord(
partition int32,
batch *kmsg.RecordBatch,
record *kmsg.Record,
recordsPool recordsPool,
) *Record {
h := make([]RecordHeader, 0, len(record.Headers))
for _, kv := range record.Headers {
Expand All @@ -1761,19 +1787,20 @@ func recordToRecord(
Value: kv.Value,
})
}
r := recordsPool.get()

r.Key = record.Key
r.Value = record.Value
r.Headers = h
r.Topic = topic
r.Partition = partition
r.Attrs = RecordAttrs{uint8(batch.Attributes)}
r.ProducerID = batch.ProducerID
r.ProducerEpoch = batch.ProducerEpoch
r.LeaderEpoch = batch.PartitionLeaderEpoch
r.Offset = batch.FirstOffset + int64(record.OffsetDelta)
r.recordsPool = recordsPool

r := &Record{
Key: record.Key,
Value: record.Value,
Headers: h,
Topic: topic,
Partition: partition,
Attrs: RecordAttrs{uint8(batch.Attributes)},
ProducerID: batch.ProducerID,
ProducerEpoch: batch.ProducerEpoch,
LeaderEpoch: batch.PartitionLeaderEpoch,
Offset: batch.FirstOffset + int64(record.OffsetDelta),
}
if r.Attrs.TimestampType() == 0 {
r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64)
} else {
Expand Down